From 0f64693ec900f10aa5a98abd949b80d10d0bad96 Mon Sep 17 00:00:00 2001 From: Rohit-R-Rao Date: Thu, 15 May 2025 18:45:31 -0500 Subject: [PATCH 1/3] creep code only --- GEMstack/onboard/planning/creep_planning.py | 229 ++++++++++++++++++++ 1 file changed, 229 insertions(+) create mode 100644 GEMstack/onboard/planning/creep_planning.py diff --git a/GEMstack/onboard/planning/creep_planning.py b/GEMstack/onboard/planning/creep_planning.py new file mode 100644 index 00000000..3fff736a --- /dev/null +++ b/GEMstack/onboard/planning/creep_planning.py @@ -0,0 +1,229 @@ +# File: GEMstack/onboard/planning/creep_planning.py +from typing import List, Tuple +import math +from ..component import Component +from ...state import AllState, VehicleState, EntityRelation, EntityRelationEnum, Path, Trajectory, Route, \ + ObjectFrameEnum +from ...utils import serialization +from ...mathutils import transforms +import numpy as np +from .parking_motion_planning import longitudinal_plan, longitudinal_brake + + +DEBUG = False # Set to False to disable debug output + + +''' +# OTHER CREEP CONTROL CODE + +def inverse_speed_function(distance_to_object): + +# max_creep_speed = 1.0 # Maximum speed when creeping (m/s) +# min_creep_speed = 0.2 # Minimum speed to maintain (m/s) +# safe_distance = 5.0 # Distance at which to start slowing down (m) + +# # Almost stop when very close (≤0.25m) +# if distance_to_object <= 0.25: +# return 0.0 # Practically zero speed + +# # Slow creep when close (≤1.5m) +# if distance_to_object <= 3.0: +# return min_creep_speed +# speed = min_creep_speed + (max_creep_speed - min_creep_speed) * (distance_to_object / safe_distance) +# return min(speed, max_creep_speed) + +def pid_speed_control(distance_to_object, target_distance, current_speed, prev_error, integral, dt, + kp=0.5, ki=0.1, kd=0.05, min_speed=0.2, max_speed=1.0): + if distance_to_object <= 0.25: + return 0.0, 0.0, 0.0 + error = distance_to_object - target_distance + integral += error * dt + integral = max(-5.0, min(integral, 5.0)) # Prevent integral windup + derivative = (error - prev_error) / dt if dt > 0 else 0 + p_term = kp * error + i_term = ki * integral + d_term = kd * derivative + speed_adjustment = p_term + i_term + d_term + base_speed = 0.5 # Base creep speed + target_speed = base_speed + speed_adjustment + target_speed = max(min_speed, min(target_speed, max_speed)) + return target_speed, error, integral + + +''' + + + +class CreepTrajectoryPlanner(Component): + """Follows the given route. Brakes if the ego–vehicle must yield + (e.g. to a pedestrian) or if the end of the route is near; otherwise, + it accelerates (or cruises) toward a desired speed. + """ + + def __init__(self): + self.route_progress = None + self.t_last = None + self.acceleration = 5 + self.desired_speed = 2.0 + self.deceleration = 2.0 + self.emergency_brake = 8.0 + # Parameters for end-of-route linear deceleration + self.end_stop_distance = 12.5 # Distance in meters to start linear deceleration + # Did 15, 10, 7.5, 17.5, 20, 12.5 + def state_inputs(self): + return ['all'] + + def state_outputs(self) -> List[str]: + return ['trajectory'] + + def rate(self): + return 10.0 + + def check_end_of_route(self, route, current_parameter): + """Check if vehicle is approaching the end of the route and calculate + appropriate linear deceleration parameters. + + Args: + route: The complete route + current_parameter: Current position along the route + + Returns: + (bool, float): Tuple containing: + - Whether the vehicle should start decelerating + - Adjusted speed if deceleration needed, otherwise desired_speed + """ + route_length = route.length() + + distance_remaining = route_length - current_parameter + + if DEBUG: + print(f"[DEBUG] check_end_of_route: Route length = {route_length}, " + f"Current position = {current_parameter}, " + f"Distance remaining = {distance_remaining}") + + if distance_remaining <= self.end_stop_distance: + if distance_remaining > 0.1: # Avoid division by very small numbers + required_decel = (self.desired_speed ** 2) / (2 * distance_remaining) + + linear_factor = distance_remaining / self.end_stop_distance + adjusted_speed = self.desired_speed * linear_factor + if DEBUG: + print(f"[DEBUG] End deceleration active: {distance_remaining:.2f}m remaining, " + f"required deceleration = {required_decel:.2f} m/s², " + f"speed adjusted to {adjusted_speed:.2f} m/s") + + return True, adjusted_speed + else: + return True, 0.0 + + return False, self.desired_speed + + def update(self, state: AllState): + vehicle = state.vehicle # type: VehicleState + route = state.route # type: Route + t = state.t + + if DEBUG: + print("[DEBUG] CreepTrajectoryPlanner.update: t =", t) + + if self.t_last is None: + self.t_last = t + dt = t - self.t_last + if DEBUG: + print("[DEBUG] CreepTrajectoryPlanner.update: dt =", dt) + + curr_x = vehicle.pose.x + curr_y = vehicle.pose.y + curr_v = vehicle.v + if DEBUG: + print(f"[DEBUG] CreepTrajectoryPlanner.update: Vehicle position = ({curr_x}, {curr_y}), speed = {curr_v}, ") + + # Determine progress along the route. + if self.route_progress is None: + self.route_progress = 0.0 + _, closest_parameter = route.closest_point_local( + [curr_x, curr_y], + (self.route_progress - 5.0, self.route_progress + 5.0) + ) + if DEBUG: + print("[DEBUG] CreepTrajectoryPlanner.update: Closest parameter on route =", closest_parameter) + self.route_progress = closest_parameter + + # Check if approaching end of route and get adjusted speed + approaching_end, target_speed = self.check_end_of_route(route, closest_parameter) + if DEBUG and approaching_end: + print("[DEBUG] CreepTrajectoryPlanner.update: Vehicle is approaching end of route") + print(f"[DEBUG] CreepTrajectoryPlanner.update: Target speed = {target_speed}") + + # Extract a 10 m segment of the route for planning lookahead. + route_with_lookahead = route.trim(closest_parameter, closest_parameter + 10.0) + if DEBUG: + print("[DEBUG] CreepTrajectoryPlanner.update: Route Lookahead =", route_with_lookahead) + + print("[DEBUG] state", state.relations) + # Check whether any yield relations (e.g. due to pedestrians) require braking. + stay_braking = False + pointSet = set() + for i in range(len(route_with_lookahead.points)): + if tuple(route_with_lookahead.points[i]) in pointSet: + stay_braking = True + break + pointSet.add(tuple(route_with_lookahead.points[i])) + + should_brake = any( + r.type == EntityRelationEnum.STOPPING_AT and r.obj1 == '' + for r in state.relations + ) + should_decelerate = any( + r.type == EntityRelationEnum.YIELDING and r.obj1 == '' + for r in state.relations + ) if should_brake == False else False + + # If approaching end of route, override the target speed + if approaching_end: + should_accelerate = (not should_brake and not should_decelerate and curr_v < target_speed) + else: + should_accelerate = (not should_brake and not should_decelerate and curr_v < self.desired_speed) + + if DEBUG: + print("[DEBUG] CreepTrajectoryPlanner.update: stay_braking =", stay_braking) + print("[DEBUG] CreepTrajectoryPlanner.update: should_brake =", should_brake) + print("[DEBUG] CreepTrajectoryPlanner.update: should_accelerate =", should_accelerate) + print("[DEBUG] CreepTrajectoryPlanner.update: should_decelerate =", should_decelerate) + print("[DEBUG] CreepTrajectoryPlanner.update: target_speed =", target_speed if approaching_end else self.desired_speed) + + if stay_braking: + traj = longitudinal_brake(route_with_lookahead, 0.0, 0.0, 0.0) + if DEBUG: + print("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_brake (stay braking).") + elif should_brake: + traj = longitudinal_brake(route_with_lookahead, self.emergency_brake, curr_v) + if DEBUG: + print("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_brake.") + elif should_decelerate: + traj = longitudinal_brake(route_with_lookahead, self.deceleration, curr_v) + if DEBUG: + print("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_brake.") + elif approaching_end: + # Use linear deceleration to stop at end of route + traj = longitudinal_plan(route_with_lookahead, self.acceleration, + self.deceleration, target_speed, curr_v) + if DEBUG: + print("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_plan with end-of-route deceleration.") + elif should_accelerate: + traj = longitudinal_plan(route_with_lookahead, self.acceleration, + self.deceleration, self.desired_speed, curr_v) + if DEBUG: + print("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_plan (accelerate).") + else: + # Maintain current speed if not accelerating or braking. + traj = longitudinal_plan(route_with_lookahead, 0.0, self.deceleration, self.desired_speed, curr_v) + if DEBUG: + print( + "[DEBUG] CreepTrajectoryPlanner.update: Maintaining current speed with longitudinal_plan (0 accel).") + + self.t_last = t + if DEBUG: + print(f'[DEBUG] Current Velocity: {curr_v}, Target Speed: {target_speed if approaching_end else self.desired_speed}') + print("[DEBUG] CreepTrajectoryPlanner.update: Returning trajectory with", len(traj.points), "points.") + return traj \ No newline at end of file From 7223ff67ac39e0407d58ac2f8831931ac3601a22 Mon Sep 17 00:00:00 2001 From: Rohit-R-Rao Date: Fri, 16 May 2025 02:57:14 -0500 Subject: [PATCH 2/3] dup fix attempt --- GEMstack/onboard/planning/creep_planning.py | 48 --------------------- 1 file changed, 48 deletions(-) diff --git a/GEMstack/onboard/planning/creep_planning.py b/GEMstack/onboard/planning/creep_planning.py index 3fff736a..20385632 100644 --- a/GEMstack/onboard/planning/creep_planning.py +++ b/GEMstack/onboard/planning/creep_planning.py @@ -122,46 +122,22 @@ def update(self, state: AllState): vehicle = state.vehicle # type: VehicleState route = state.route # type: Route t = state.t - - if DEBUG: - print("[DEBUG] CreepTrajectoryPlanner.update: t =", t) - if self.t_last is None: self.t_last = t dt = t - self.t_last - if DEBUG: - print("[DEBUG] CreepTrajectoryPlanner.update: dt =", dt) - curr_x = vehicle.pose.x curr_y = vehicle.pose.y curr_v = vehicle.v - if DEBUG: - print(f"[DEBUG] CreepTrajectoryPlanner.update: Vehicle position = ({curr_x}, {curr_y}), speed = {curr_v}, ") - - # Determine progress along the route. if self.route_progress is None: self.route_progress = 0.0 _, closest_parameter = route.closest_point_local( [curr_x, curr_y], (self.route_progress - 5.0, self.route_progress + 5.0) ) - if DEBUG: - print("[DEBUG] CreepTrajectoryPlanner.update: Closest parameter on route =", closest_parameter) self.route_progress = closest_parameter - - # Check if approaching end of route and get adjusted speed approaching_end, target_speed = self.check_end_of_route(route, closest_parameter) - if DEBUG and approaching_end: - print("[DEBUG] CreepTrajectoryPlanner.update: Vehicle is approaching end of route") - print(f"[DEBUG] CreepTrajectoryPlanner.update: Target speed = {target_speed}") - - # Extract a 10 m segment of the route for planning lookahead. route_with_lookahead = route.trim(closest_parameter, closest_parameter + 10.0) - if DEBUG: - print("[DEBUG] CreepTrajectoryPlanner.update: Route Lookahead =", route_with_lookahead) - print("[DEBUG] state", state.relations) - # Check whether any yield relations (e.g. due to pedestrians) require braking. stay_braking = False pointSet = set() for i in range(len(route_with_lookahead.points)): @@ -179,51 +155,27 @@ def update(self, state: AllState): for r in state.relations ) if should_brake == False else False - # If approaching end of route, override the target speed if approaching_end: should_accelerate = (not should_brake and not should_decelerate and curr_v < target_speed) else: should_accelerate = (not should_brake and not should_decelerate and curr_v < self.desired_speed) - if DEBUG: - print("[DEBUG] CreepTrajectoryPlanner.update: stay_braking =", stay_braking) - print("[DEBUG] CreepTrajectoryPlanner.update: should_brake =", should_brake) - print("[DEBUG] CreepTrajectoryPlanner.update: should_accelerate =", should_accelerate) - print("[DEBUG] CreepTrajectoryPlanner.update: should_decelerate =", should_decelerate) - print("[DEBUG] CreepTrajectoryPlanner.update: target_speed =", target_speed if approaching_end else self.desired_speed) - if stay_braking: traj = longitudinal_brake(route_with_lookahead, 0.0, 0.0, 0.0) - if DEBUG: - print("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_brake (stay braking).") elif should_brake: traj = longitudinal_brake(route_with_lookahead, self.emergency_brake, curr_v) - if DEBUG: - print("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_brake.") elif should_decelerate: traj = longitudinal_brake(route_with_lookahead, self.deceleration, curr_v) - if DEBUG: - print("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_brake.") elif approaching_end: # Use linear deceleration to stop at end of route traj = longitudinal_plan(route_with_lookahead, self.acceleration, self.deceleration, target_speed, curr_v) - if DEBUG: - print("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_plan with end-of-route deceleration.") elif should_accelerate: traj = longitudinal_plan(route_with_lookahead, self.acceleration, self.deceleration, self.desired_speed, curr_v) - if DEBUG: - print("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_plan (accelerate).") else: # Maintain current speed if not accelerating or braking. traj = longitudinal_plan(route_with_lookahead, 0.0, self.deceleration, self.desired_speed, curr_v) - if DEBUG: - print( - "[DEBUG] CreepTrajectoryPlanner.update: Maintaining current speed with longitudinal_plan (0 accel).") self.t_last = t - if DEBUG: - print(f'[DEBUG] Current Velocity: {curr_v}, Target Speed: {target_speed if approaching_end else self.desired_speed}') - print("[DEBUG] CreepTrajectoryPlanner.update: Returning trajectory with", len(traj.points), "points.") return traj \ No newline at end of file From a6fa4f0d22eb85b8351a78e0cdf499a2432a0fdb Mon Sep 17 00:00:00 2001 From: Rohit-R-Rao Date: Fri, 16 May 2025 03:15:54 -0500 Subject: [PATCH 3/3] dup error fix 2 --- GEMstack/onboard/planning/creep_planning.py | 225 ++++++++------------ 1 file changed, 85 insertions(+), 140 deletions(-) diff --git a/GEMstack/onboard/planning/creep_planning.py b/GEMstack/onboard/planning/creep_planning.py index 20385632..ebd4b3fa 100644 --- a/GEMstack/onboard/planning/creep_planning.py +++ b/GEMstack/onboard/planning/creep_planning.py @@ -7,53 +7,11 @@ from ...utils import serialization from ...mathutils import transforms import numpy as np -from .parking_motion_planning import longitudinal_plan, longitudinal_brake - +from .parking_motion_planning import longitudinal_plan as lp_fn, longitudinal_brake as lb_fn DEBUG = False # Set to False to disable debug output -''' -# OTHER CREEP CONTROL CODE - -def inverse_speed_function(distance_to_object): - -# max_creep_speed = 1.0 # Maximum speed when creeping (m/s) -# min_creep_speed = 0.2 # Minimum speed to maintain (m/s) -# safe_distance = 5.0 # Distance at which to start slowing down (m) - -# # Almost stop when very close (≤0.25m) -# if distance_to_object <= 0.25: -# return 0.0 # Practically zero speed - -# # Slow creep when close (≤1.5m) -# if distance_to_object <= 3.0: -# return min_creep_speed -# speed = min_creep_speed + (max_creep_speed - min_creep_speed) * (distance_to_object / safe_distance) -# return min(speed, max_creep_speed) - -def pid_speed_control(distance_to_object, target_distance, current_speed, prev_error, integral, dt, - kp=0.5, ki=0.1, kd=0.05, min_speed=0.2, max_speed=1.0): - if distance_to_object <= 0.25: - return 0.0, 0.0, 0.0 - error = distance_to_object - target_distance - integral += error * dt - integral = max(-5.0, min(integral, 5.0)) # Prevent integral windup - derivative = (error - prev_error) / dt if dt > 0 else 0 - p_term = kp * error - i_term = ki * integral - d_term = kd * derivative - speed_adjustment = p_term + i_term + d_term - base_speed = 0.5 # Base creep speed - target_speed = base_speed + speed_adjustment - target_speed = max(min_speed, min(target_speed, max_speed)) - return target_speed, error, integral - - -''' - - - class CreepTrajectoryPlanner(Component): """Follows the given route. Brakes if the ego–vehicle must yield (e.g. to a pedestrian) or if the end of the route is near; otherwise, @@ -61,15 +19,14 @@ class CreepTrajectoryPlanner(Component): """ def __init__(self): - self.route_progress = None - self.t_last = None - self.acceleration = 5 - self.desired_speed = 2.0 - self.deceleration = 2.0 - self.emergency_brake = 8.0 - # Parameters for end-of-route linear deceleration - self.end_stop_distance = 12.5 # Distance in meters to start linear deceleration - # Did 15, 10, 7.5, 17.5, 20, 12.5 + self.route_index = None + self.time_prev = None + self.accel_gain = 5 + self.target_vel = 2.0 + self.decel_gain = 2.0 + self.full_brake = 8.0 + self.stop_dist_threshold = 10 # Distance in meters to start linear deceleration + def state_inputs(self): return ['all'] @@ -79,103 +36,91 @@ def state_outputs(self) -> List[str]: def rate(self): return 10.0 - def check_end_of_route(self, route, current_parameter): - """Check if vehicle is approaching the end of the route and calculate - appropriate linear deceleration parameters. - - Args: - route: The complete route - current_parameter: Current position along the route - - Returns: - (bool, float): Tuple containing: - - Whether the vehicle should start decelerating - - Adjusted speed if deceleration needed, otherwise desired_speed - """ - route_length = route.length() - - distance_remaining = route_length - current_parameter - + def is_near_route_end(self, given_route, pos_param): + route_len = given_route.length() + remaining_dist = route_len - pos_param + if DEBUG: - print(f"[DEBUG] check_end_of_route: Route length = {route_length}, " - f"Current position = {current_parameter}, " - f"Distance remaining = {distance_remaining}") - - if distance_remaining <= self.end_stop_distance: - if distance_remaining > 0.1: # Avoid division by very small numbers - required_decel = (self.desired_speed ** 2) / (2 * distance_remaining) - - linear_factor = distance_remaining / self.end_stop_distance - adjusted_speed = self.desired_speed * linear_factor + print(f"[DEBUG] is_near_route_end: Route length = {route_len}, " + f"Current position = {pos_param}, " + f"Distance remaining = {remaining_dist}") + + if remaining_dist <= self.stop_dist_threshold: + if remaining_dist > 0.1: + req_decel = (self.target_vel ** 2) / (2 * remaining_dist) + linear_scale = remaining_dist / self.stop_dist_threshold + speed_mod = self.target_vel * linear_scale if DEBUG: - print(f"[DEBUG] End deceleration active: {distance_remaining:.2f}m remaining, " - f"required deceleration = {required_decel:.2f} m/s², " - f"speed adjusted to {adjusted_speed:.2f} m/s") - - return True, adjusted_speed + print(f"[DEBUG] Linear decel active: {remaining_dist:.2f}m left, " + f"required deceleration = {req_decel:.2f} m/s², " + f"adjusted speed = {speed_mod:.2f} m/s") + return True, speed_mod else: return True, 0.0 - - return False, self.desired_speed - - def update(self, state: AllState): - vehicle = state.vehicle # type: VehicleState - route = state.route # type: Route - t = state.t - if self.t_last is None: - self.t_last = t - dt = t - self.t_last - curr_x = vehicle.pose.x - curr_y = vehicle.pose.y - curr_v = vehicle.v - if self.route_progress is None: - self.route_progress = 0.0 - _, closest_parameter = route.closest_point_local( - [curr_x, curr_y], - (self.route_progress - 5.0, self.route_progress + 5.0) + + return False, self.target_vel + + def update(self, full_state: AllState): + ego = full_state.vehicle # type: VehicleState + nav_path = full_state.route # type: Route + current_time = full_state.t + + if self.time_prev is None: + self.time_prev = current_time + delta_time = current_time - self.time_prev + + ego_x = ego.pose.x + ego_y = ego.pose.y + ego_v = ego.v + + if self.route_index is None: + self.route_index = 0.0 + + _, new_param = nav_path.closest_point_local( + [ego_x, ego_y], + (self.route_index - 5.0, self.route_index + 5.0) ) - self.route_progress = closest_parameter - approaching_end, target_speed = self.check_end_of_route(route, closest_parameter) - route_with_lookahead = route.trim(closest_parameter, closest_parameter + 10.0) - - stay_braking = False - pointSet = set() - for i in range(len(route_with_lookahead.points)): - if tuple(route_with_lookahead.points[i]) in pointSet: - stay_braking = True + self.route_index = new_param + near_end, speed_setpoint = self.is_near_route_end(nav_path, new_param) + trimmed_path = nav_path.trim(new_param, new_param + 10.0) + + duplicate_detected = False + unique_pts = set() + for j in range(len(trimmed_path.points)): + if tuple(trimmed_path.points[j]) in unique_pts: + duplicate_detected = True break - pointSet.add(tuple(route_with_lookahead.points[i])) + unique_pts.add(tuple(trimmed_path.points[j])) - should_brake = any( - r.type == EntityRelationEnum.STOPPING_AT and r.obj1 == '' - for r in state.relations + brake_trigger = any( + rel.type == EntityRelationEnum.STOPPING_AT and rel.obj1 == '' + for rel in full_state.relations ) - should_decelerate = any( - r.type == EntityRelationEnum.YIELDING and r.obj1 == '' - for r in state.relations - ) if should_brake == False else False + yield_trigger = any( + rel.type == EntityRelationEnum.YIELDING and rel.obj1 == '' + for rel in full_state.relations + ) if not brake_trigger else False - if approaching_end: - should_accelerate = (not should_brake and not should_decelerate and curr_v < target_speed) + if near_end: + accelerate = (not brake_trigger and not yield_trigger and ego_v < speed_setpoint) else: - should_accelerate = (not should_brake and not should_decelerate and curr_v < self.desired_speed) - - if stay_braking: - traj = longitudinal_brake(route_with_lookahead, 0.0, 0.0, 0.0) - elif should_brake: - traj = longitudinal_brake(route_with_lookahead, self.emergency_brake, curr_v) - elif should_decelerate: - traj = longitudinal_brake(route_with_lookahead, self.deceleration, curr_v) - elif approaching_end: - # Use linear deceleration to stop at end of route - traj = longitudinal_plan(route_with_lookahead, self.acceleration, - self.deceleration, target_speed, curr_v) - elif should_accelerate: - traj = longitudinal_plan(route_with_lookahead, self.acceleration, - self.deceleration, self.desired_speed, curr_v) + accelerate = (not brake_trigger and not yield_trigger and ego_v < self.target_vel) + + if duplicate_detected: + trajectory = lb_fn(trimmed_path, 0.0, 0.0, 0.0) + elif brake_trigger: + trajectory = lb_fn(trimmed_path, self.full_brake, ego_v) + elif yield_trigger: + trajectory = lb_fn(trimmed_path, self.decel_gain, ego_v) + elif near_end: + trajectory = lp_fn(trimmed_path, self.accel_gain, + self.decel_gain, speed_setpoint, ego_v) + elif accelerate: + trajectory = lp_fn(trimmed_path, self.accel_gain, + self.decel_gain, self.target_vel, ego_v) else: - # Maintain current speed if not accelerating or braking. - traj = longitudinal_plan(route_with_lookahead, 0.0, self.deceleration, self.desired_speed, curr_v) + trajectory = lp_fn(trimmed_path, 0.0, self.decel_gain, self.target_vel, ego_v) + + self.time_prev = current_time + return trajectory - self.t_last = t - return traj \ No newline at end of file