diff --git a/GEMstack/knowledge/defaults/computation_graph.yaml b/GEMstack/knowledge/defaults/computation_graph.yaml index 43516d4ce..3c76ef065 100644 --- a/GEMstack/knowledge/defaults/computation_graph.yaml +++ b/GEMstack/knowledge/defaults/computation_graph.yaml @@ -46,18 +46,28 @@ components: - mission_planning: inputs: all outputs: mission + - mission_planning_example: # one way + inputs: all + outputs: mission_plan - route_planning: inputs: all outputs: route - - route_planning_component: - inputs: all - outputs: [ route, mission ] + - planning_route: # one way + inputs: [vehicle,mission_plan, agents, obstacles] + outputs: route - save_lidar_data: inputs: all outputs: - driving_logic: inputs: outputs: intent + - parking_component: # one way + inputs: all + outputs: mission_plan + - summon_component: # one way + inputs: all + outputs: route + outputs: route - _mission_planner: # one way inputs: all outputs: mission_plan @@ -75,4 +85,4 @@ components: outputs: - gazebo_collision_logger: inputs: [] - outputs: \ No newline at end of file + outputs: diff --git a/GEMstack/onboard/planning/collision.py b/GEMstack/onboard/planning/collision.py new file mode 100644 index 000000000..4b73db2c1 --- /dev/null +++ b/GEMstack/onboard/planning/collision.py @@ -0,0 +1,55 @@ +from scipy.ndimage import distance_transform_edt + +def build_collision_lookup(grid, safety_margin=2, vehicle_width=1.0): + """ + Build a collision lookup table for efficient collision checking. + + Args: + grid: Binary occupancy grid (1=obstacle, 0=free) + safety_margin: Additional safety margin in grid cells + vehicle_width: Width of the vehicle in grid cells + + Returns: + Dictionary with distance transform, collision mask, and margin + """ + dist = distance_transform_edt(1 - grid) + margin = safety_margin + vehicle_width / 2.0 + return { + "distance": dist, + "collision_mask": dist <= margin, + "margin": margin, + } + +def fast_collision_check(x, y, lookup): + """ + Check if a point is in collision using the precomputed lookup table. + + Args: + x, y: Coordinates to check + lookup: Collision lookup table from build_collision_lookup + + Returns: + True if in collision, False if free + """ + xi, yi = int(round(x)), int(round(y)) + cm = lookup["collision_mask"] + if xi < 0 or yi < 0 or xi >= cm.shape[0] or yi >= cm.shape[1]: + return True # Out of bounds is considered collision + return cm[xi, yi] + +def path_collision_check(path, lookup, x_off=0, y_off=0): + """ + Check if a path is collision-free. + + Args: + path: List of (x, y, theta) poses + lookup: Collision lookup table + x_off, y_off: Optional offsets to apply + + Returns: + True if collision detected, False if path is free + """ + for px, py, _ in path: + if fast_collision_check(px + x_off, py + y_off, lookup): + return True + return False diff --git a/GEMstack/onboard/planning/highbay_image.pgm b/GEMstack/onboard/planning/highbay_image.pgm new file mode 100644 index 000000000..80c37534b Binary files /dev/null and b/GEMstack/onboard/planning/highbay_image.pgm differ diff --git a/GEMstack/onboard/planning/kinodynamic_rrt_star.py b/GEMstack/onboard/planning/kinodynamic_rrt_star.py new file mode 100644 index 000000000..e6e605055 --- /dev/null +++ b/GEMstack/onboard/planning/kinodynamic_rrt_star.py @@ -0,0 +1,763 @@ +import math +import random +import time +import numpy as np +import matplotlib.pyplot as plt +import scipy.spatial + +from scipy.interpolate import splprep, splev +from .collision import fast_collision_check +import math + +def normalize_angle(angle): + while angle > math.pi: angle -= 2.0 * math.pi + while angle < -math.pi: angle += 2.0 * math.pi + return angle + + +class RRTNode: + """Represents a node in the RRT tree.""" + def __init__(self, x, y, theta, velocity=0.0, phi_steering=0.0, cost=0.0, parent=None): + self.x = x + self.y = y + self.theta = theta # Orientation in radians + self.velocity = velocity + self.phi_steering = phi_steering # Steering angle + self.cost = cost # Cost from the start node to this node + self.parent = parent # Parent node + self.path_from_parent = [] # List of states (x, y, theta, v, phi) forming the path from the parent + + def get_state(self): + """Returns the full state of the node.""" + return (self.x, self.y, self.theta, self.velocity, self.phi_steering) + + def get_position(self): + """Returns the (x, y) position of the node.""" + return (self.x, self.y) + +class OptimizedKinodynamicRRT: + """ + Implements an optimized Bidirectional Kinodynamic RRT* algorithm + for path planning with vehicle dynamics. + """ + def __init__(self, + occupancy_grid, + collision_lookup_table, + start_pose_tuple, # (x, y, theta) + goal_pose_tuple, # (x, y, theta) + max_iterations=100000, + local_sampling_step_size=1.0, + vehicle_width=20.0, + vehicle_length=45.0, + goal_biasing_rate=0.5): + + self.occupancy_grid = occupancy_grid + self.collision_lookup = collision_lookup_table + + self.start_node = RRTNode(*start_pose_tuple, cost=0.0) + self.true_goal_configuration = RRTNode(*goal_pose_tuple) # Goal in world frame + # Goal node for the goal tree (reversed orientation for backward expansion) + self.goal_tree_root_node = RRTNode( + goal_pose_tuple[0], goal_pose_tuple[1], + self._calculate_inverse_angle(goal_pose_tuple[2]), cost=0.0 + ) + + self.max_iterations = max_iterations + self.sampling_step_size = local_sampling_step_size + self.vehicle_length = vehicle_length # Vehicle wheelbase + self.vehicle_width = vehicle_width # Vehicle width + self.time_step_simulation = 0.1 # dt for simulation + self.goal_biasing_rate = goal_biasing_rate # Chance to sample goal directly + + self.nodes_in_start_tree = [self.start_node] + self.nodes_in_goal_tree = [self.goal_tree_root_node] + self.kdtree_for_start_tree = None + self.kdtree_for_goal_tree = None + self.kdtree_rebuild_frequency = 50 # Rebuild KD-tree every N nodes added + + self.is_path_found = False + self.tree_connection_nodes = None # (node_from_start_tree, node_from_goal_tree) + + # Thresholds for considering a connection or goal reached + self.goal_position_tolerance = 20.0 + self.goal_angle_tolerance_rad = math.radians(60.0) + + # Vehicle motion constraints + self.max_steering_angle_rad = math.pi / 3 + self.max_velocity = 10.0 + self.min_velocity = 1.0 # Minimum velocity for PFF fallback + + self.grid_dimensions = self.collision_lookup["collision_mask"].shape # (height, width) + self.planning_time_budget_sec = 10.0 + + self.motion_primitives = self._initialize_motion_primitives() + + # Control and cost parameters + self.proportional_steering_gain = 0.7 # For PFF fallback controller + self.steering_smoothness_weight = 0.1 # Penalty for steering in primitive selection + self.goal_heading_alignment_weight = 250.0 # Strong weight for aligning with target heading + + # Parameters + self.nominal_connection_attempt_distance = 15.0 # Used in _try_connect_trees + self.connection_max_distance_factor = 1.5 # Multiplier for nominal_connection_attempt_distance + self.endpoint_weight = 1e4 + self.base_max_s = 1e4 + + def _calculate_inverse_angle(self, theta_rad): + """Calculates an angle roughly opposite to the input, for backward expansion.""" + angle = normalize_angle(theta_rad) + if angle < 0: + return angle + math.pi + return angle-math.pi + + def _initialize_motion_primitives(self): + """Generates a discrete set of motion primitives.""" + primitives_list = [] + velocities_to_try = [0.5 * self.max_velocity, 0.8 * self.max_velocity, self.max_velocity] + steering_range_limit = 0.9 * self.max_steering_angle_rad + steering_angles_to_try_rad = np.array([ + 0.0, + 0.25 * steering_range_limit, -0.25 * steering_range_limit, + 0.6 * steering_range_limit, -0.6 * steering_range_limit, + steering_range_limit, -steering_range_limit + ]) + + for velocity in velocities_to_try: + for steering_angle in steering_angles_to_try_rad: + num_steps = 15 + if abs(steering_angle) > 0.5 * steering_range_limit: + num_steps = 20 + elif steering_angle == 0.0: + num_steps = 20 if velocity == self.max_velocity else 15 + + primitive_path = self._generate_single_primitive_path(velocity, steering_angle, num_steps) + if primitive_path: + primitives_list.append(primitive_path) + + # Ensure at least one long straight primitive exists + has_long_straight_primitive = any( + p[0][4] == 0.0 and p[0][3] == self.max_velocity and len(p) > 15 + for p in primitives_list + ) + if not has_long_straight_primitive: + primitives_list.append(self._generate_single_primitive_path(self.max_velocity, 0.0, steps=20)) + + print(f"Initialized {len(primitives_list)} motion primitives.") + return primitives_list + + def _generate_single_primitive_path(self, velocity, steering_angle_rad, steps=15): + """Simulates a vehicle path for a given velocity and steering angle.""" + path_states = [(0.0, 0.0, 0.0, velocity, steering_angle_rad)] # Relative initial state + current_x, current_y, current_theta = 0.0, 0.0, 0.0 + + for _ in range(steps): + current_x += velocity * math.cos(current_theta) * self.time_step_simulation + current_y += velocity * math.sin(current_theta) * self.time_step_simulation + current_theta = normalize_angle( + current_theta + (velocity * math.tan(steering_angle_rad) / self.vehicle_length) * self.time_step_simulation + ) + path_states.append((current_x, current_y, current_theta, velocity, steering_angle_rad)) + return path_states + + def _build_kdtree(self, tree_identifier='start'): + """Builds or rebuilds the KD-tree for the specified tree.""" + node_list = self.nodes_in_start_tree if tree_identifier == 'start' else self.nodes_in_goal_tree + if not node_list: + return + + positions = np.array([(node.x, node.y) for node in node_list]) + if positions.ndim == 1: # Single point + positions = positions.reshape(1, -1) + if not positions.size: # Empty array + return + + kdtree_instance = scipy.spatial.cKDTree(positions) + if tree_identifier == 'start': + self.kdtree_for_start_tree = kdtree_instance + else: + self.kdtree_for_goal_tree = kdtree_instance + + def _get_nearest_node_in_tree(self, query_position_xy, tree_identifier='start'): + """Finds the nearest node in the specified tree to the query_position_xy.""" + target_node_list = self.nodes_in_start_tree if tree_identifier == 'start' else self.nodes_in_goal_tree + target_kdtree = self.kdtree_for_start_tree if tree_identifier == 'start' else self.kdtree_for_goal_tree + + if not target_node_list: + return None + + if target_kdtree is None or len(target_node_list) % self.kdtree_rebuild_frequency == 0: + self._build_kdtree(tree_identifier) + target_kdtree = self.kdtree_for_start_tree if tree_identifier == 'start' else self.kdtree_for_goal_tree + + if target_kdtree is None: # Fallback if KD-tree build failed or list is too small + return min( + target_node_list, + key=lambda node: math.hypot(node.x - query_position_xy[0], node.y - query_position_xy[1]), + default=None + ) + + _, nearest_indices = target_kdtree.query(query_position_xy) + nearest_index = nearest_indices[0] if isinstance(nearest_indices, (np.ndarray, list)) else nearest_indices + nearest_index = int(nearest_index) + + if 0 <= nearest_index < len(target_node_list): + return target_node_list[nearest_index] + return None + + def _get_random_global_target_sample(self): + """Samples a random (x, y) position within the grid boundaries.""" + random_x = random.uniform(0, self.grid_dimensions[1] - 1) # Grid width + random_y = random.uniform(0, self.grid_dimensions[0] - 1) # Grid height + return (random_x, random_y) + + def _generate_new_node_kinodynamically(self, + source_node, + target_position_xy, + optional_target_theta_rad=None, + tree_identifier='start', # Unused in this version, kept for signature consistency + is_direct_connection_attempt=False, + force_heading_guidance_for_local_exploration=False): # Unused in this version + """ + Generates a new node by applying motion primitives or a fallback controller + from source_node towards target_position_xy. + """ + target_state_for_scoring = (target_position_xy[0], target_position_xy[1], optional_target_theta_rad) + best_score_for_primitive = float('inf') + best_resulting_node_from_primitive = None + + # Determine if heading cost should be applied for primitive selection + apply_heading_cost = (optional_target_theta_rad is not None) and \ + (is_direct_connection_attempt or force_heading_guidance_for_local_exploration) + + # Try motion primitives first + for primitive_path_model in self.motion_primitives: + if not primitive_path_model: + continue + + transformed_primitive_path = self._transform_primitive_to_global_frame(primitive_path_model, source_node) + if not transformed_primitive_path or len(transformed_primitive_path) < 2: + continue + + end_state_of_primitive = transformed_primitive_path[-1] + # Avoid trivial primitives that don't move + if math.hypot(end_state_of_primitive[0] - source_node.x, end_state_of_primitive[1] - source_node.y) < 0.01: + continue + + # Calculate cost for this primitive + position_distance_cost = math.hypot( + end_state_of_primitive[0] - target_state_for_scoring[0], + end_state_of_primitive[1] - target_state_for_scoring[1] + ) + current_primitive_score = position_distance_cost + \ + abs(primitive_path_model[0][4]) * self.steering_smoothness_weight # Penalty for steering + + if apply_heading_cost: + angle_difference_rad = abs(normalize_angle(end_state_of_primitive[2] - target_state_for_scoring[2])) + current_primitive_score += angle_difference_rad * self.goal_heading_alignment_weight + + if current_primitive_score < best_score_for_primitive: + is_collision_free = True + # Check collision for all waypoints in the transformed primitive, skipping the first (source_node) + for point_state in transformed_primitive_path[1:]: + if fast_collision_check(point_state[1], point_state[0], self.collision_lookup): + is_collision_free = False + break + + if is_collision_free: + best_score_for_primitive = current_primitive_score + segment_time_cost = (len(primitive_path_model) - 1) * self.time_step_simulation + best_resulting_node_from_primitive = RRTNode( + *end_state_of_primitive, # x, y, theta, v, phi + cost=source_node.cost + segment_time_cost, + parent=source_node + ) + best_resulting_node_from_primitive.path_from_parent = transformed_primitive_path + + if best_resulting_node_from_primitive: + return best_resulting_node_from_primitive + + # Fallback: If no primitive was suitable, try Proportional Feedback Controller (PFF) / "apply_controls" + return self._apply_proportional_feedback_control(source_node, target_state_for_scoring, is_direct_connection_attempt) + + + def _apply_proportional_feedback_control(self, + source_node, + target_state_tuple, # (x, y, optional_theta) + is_direct_connection_attempt=False): # Unused in this version + """ + Fallback controller: Steers the vehicle from source_node towards target_state_tuple. + This is a simplified Proportional Feedback Controller (PFF). + """ + current_x, current_y, current_theta, _, _ = source_node.get_state() + target_x, target_y, _ = target_state_tuple # Target theta not directly used by this PFF for steering cmd + + delta_x, delta_y = target_x - current_x, target_y - current_y + distance_to_target_xy = math.hypot(delta_x, delta_y) + + if distance_to_target_xy < 0.1: # Already close enough + return None + + path_segment_states = [source_node.get_state()] + + # Scale velocity based on distance, similar to original step_size logic + # This PFF generates segments of length comparable to one visualization sampling step + velocity_scale_factor = min(1.0, distance_to_target_xy / (2.0 * self.sampling_step_size)) + applied_velocity = self.max_velocity * velocity_scale_factor + + # Adjust velocity based on alignment with target direction + alignment_with_target_direction = max(0.1, math.cos(normalize_angle(math.atan2(delta_y, delta_x) - current_theta))) + applied_velocity = max(self.min_velocity, applied_velocity * alignment_with_target_direction) + applied_velocity = min(applied_velocity, self.max_velocity) + + # Proportional steering control + heading_error_rad = normalize_angle(math.atan2(delta_y, delta_x) - current_theta) + applied_steering_angle = max( + -self.max_steering_angle_rad * 0.9, + min(self.proportional_steering_gain * heading_error_rad, self.max_steering_angle_rad * 0.9) + ) + + # Determine number of simulation steps for this segment + # Aim for segment length related to visualization_sampling_step_size or distance to target + effective_step_length = applied_velocity * self.time_step_simulation + target_segment_length = min(distance_to_target_xy, self.sampling_step_size) + num_simulation_steps = max(5, min(int(target_segment_length / (effective_step_length + 1e-6)), 25)) + + + sim_x, sim_y, sim_theta = current_x, current_y, current_theta + for _ in range(num_simulation_steps): + next_x = sim_x + applied_velocity * math.cos(sim_theta) * self.time_step_simulation + next_y = sim_y + applied_velocity * math.sin(sim_theta) * self.time_step_simulation + next_theta = normalize_angle( + sim_theta + (applied_velocity * math.tan(applied_steering_angle) / self.vehicle_length) * self.time_step_simulation + ) + + if fast_collision_check(next_y, next_x, self.collision_lookup): + return None # Collision detected + + sim_x, sim_y, sim_theta = next_x, next_y, next_theta + path_segment_states.append((sim_x, sim_y, sim_theta, applied_velocity, applied_steering_angle)) + + if len(path_segment_states) <= 1: # No valid step taken + return None + + final_state_in_segment = path_segment_states[-1] + new_node_from_pff = RRTNode( + *final_state_in_segment, # x, y, theta, v, phi + cost=source_node.cost + (len(path_segment_states) - 1) * self.time_step_simulation, + parent=source_node + ) + new_node_from_pff.path_from_parent = path_segment_states + return new_node_from_pff + + + def _extend_tree(self, + tree_node_list, + tree_identifier, # 'start' or 'goal' + global_target_position_xy): + """Extends the specified tree towards the global_target_position_xy.""" + nearest_node_in_tree = self._get_nearest_node_in_tree(global_target_position_xy, tree_identifier) + if not nearest_node_in_tree: + return None + + optional_target_theta_for_expansion = None + is_attempting_direct_connection_to_overall_goal = False + + # If sampling the actual start/goal, provide its orientation for guidance + if tree_identifier == 'start' and \ + global_target_position_xy == (self.true_goal_configuration.x, self.true_goal_configuration.y): + optional_target_theta_for_expansion = self.true_goal_configuration.theta + is_attempting_direct_connection_to_overall_goal = True + elif tree_identifier == 'goal' and \ + global_target_position_xy == (self.start_node.x, self.start_node.y): + # For goal tree expanding towards start, use start_node's true world theta (not inverse) + optional_target_theta_for_expansion = self.start_node.theta + is_attempting_direct_connection_to_overall_goal = True + + newly_generated_node = self._generate_new_node_kinodynamically( + source_node=nearest_node_in_tree, + target_position_xy=global_target_position_xy, + optional_target_theta_rad=optional_target_theta_for_expansion, + tree_identifier=tree_identifier, # Passed along, though _generate_new_node doesn't currently use it directly + is_direct_connection_attempt=is_attempting_direct_connection_to_overall_goal, + force_heading_guidance_for_local_exploration=False # Typically false for general extension + ) + + if newly_generated_node: + tree_node_list.append(newly_generated_node) + return newly_generated_node + return None + + + def _try_connect_trees(self, + node_from_active_tree, # The newly added node + active_tree_identifier): # 'start' or 'goal' + """Attempts to connect node_from_active_tree to the other tree.""" + other_tree_identifier = 'goal' if active_tree_identifier == 'start' else 'start' + + nearest_node_in_other_tree = self._get_nearest_node_in_tree( + (node_from_active_tree.x, node_from_active_tree.y), other_tree_identifier + ) + if not nearest_node_in_other_tree: + return False + + distance_between_nodes = math.hypot( + node_from_active_tree.x - nearest_node_in_other_tree.x, + node_from_active_tree.y - nearest_node_in_other_tree.y + ) + + # Check if nodes are too far apart for a connection attempt + max_allowed_connection_distance = self.connection_max_distance_factor * self.nominal_connection_attempt_distance + if distance_between_nodes > max_allowed_connection_distance: + return False + + # Define connection target based on the 'other' tree's node + target_connection_x = nearest_node_in_other_tree.x + target_connection_y = nearest_node_in_other_tree.y + # Target theta must match the frame of the 'other' tree's node + # If 'other' is goal tree, its nodes have inverse angles. We need to aim for that inverse angle. + # If 'other' is start tree, its nodes have world angles. Aim for that. + target_connection_theta_rad = nearest_node_in_other_tree.theta # This is correct because nodes store their 'effective' forward theta + + # Attempt to generate a path segment from node_from_active_tree to nearest_node_in_other_tree + connection_candidate_node = self._generate_new_node_kinodynamically( + source_node=node_from_active_tree, + target_position_xy=(target_connection_x, target_connection_y), + optional_target_theta_rad=target_connection_theta_rad, + tree_identifier=active_tree_identifier, + is_direct_connection_attempt=True, # This is a focused connection + force_heading_guidance_for_local_exploration=True # Strongly guide heading + ) + + if connection_candidate_node: + # The connection_candidate_node is an extension of node_from_active_tree. + # Its theta is in the 'forward' frame of the active_tree. + # We need to check if this new node (connection_candidate_node) is close enough + # in pose to the nearest_node_in_other_tree. + + final_connection_node_theta = connection_candidate_node.theta + + position_criteria_met = math.hypot( + connection_candidate_node.x - target_connection_x, + connection_candidate_node.y - target_connection_y + ) <= self.goal_position_tolerance # Using goal_position_tolerance for connection tightness + + # Angle check: connection_candidate_node's theta should align with target_connection_theta_rad + angle_diff_to_target_node = abs(normalize_angle(final_connection_node_theta - target_connection_theta_rad)) + angle_criteria_to_target_met = angle_diff_to_target_node <= self.goal_angle_tolerance_rad + + # Sanity check: the connection segment itself should be somewhat aligned with the source node's direction + # This helps prevent awkward, immediate reverse connections. + # node_from_active_tree.theta is already in its correct forward frame. + angle_diff_from_source_node = abs(normalize_angle(final_connection_node_theta - node_from_active_tree.theta)) + angle_criteria_from_source_met = angle_diff_from_source_node <= self.goal_angle_tolerance_rad * 1.5 # Slightly more lenient + + if position_criteria_met and angle_criteria_to_target_met and angle_criteria_from_source_met: + if active_tree_identifier == 'start': + self.tree_connection_nodes = (connection_candidate_node, nearest_node_in_other_tree) + else: # active_tree was 'goal' + self.tree_connection_nodes = (nearest_node_in_other_tree, connection_candidate_node) + self.is_path_found = True + return True + return False + + + def plan(self, visualize_planning_output=True): + """Main planning loop for the Bidirectional Kinodynamic RRT.""" + print("Starting optimized Kinodynamic RRT* (bidirectional) planning...") + self.planning_start_time = time.time() + + for iteration in range(self.max_iterations): + if time.time() - self.planning_start_time > self.planning_time_budget_sec: + print("Planning time budget exceeded.") + break + + if (iteration + 1) % 200 == 0: + print(f"Iteration {iteration + 1}, " + f"Start Tree Size: {len(self.nodes_in_start_tree)}, " + f"Goal Tree Size: {len(self.nodes_in_goal_tree)}") + + # Determine which tree to extend: prioritize smaller tree, otherwise alternate + extend_start_tree_this_iteration = len(self.nodes_in_start_tree) <= len(self.nodes_in_goal_tree) + if abs(len(self.nodes_in_start_tree) - len(self.nodes_in_goal_tree)) <= 5 : # If balanced + extend_start_tree_this_iteration = (iteration % 2 == 0) + + + active_tree_nodes, active_tree_identifier = \ + (self.nodes_in_start_tree, 'start') if extend_start_tree_this_iteration \ + else (self.nodes_in_goal_tree, 'goal') + + # Determine the overall bias target (actual goal for start tree, actual start for goal tree) + # This is used by the _sample_position_for_visualization for its internal scoring + overall_bias_target_position_xy = \ + (self.true_goal_configuration.x, self.true_goal_configuration.y) if active_tree_identifier == 'start' \ + else (self.start_node.x, self.start_node.y) + + # 1. Get a global random sample (or biased sample towards goal/start) + if random.random() < self.goal_biasing_rate: + sampled_global_target_position = overall_bias_target_position_xy + else: + sampled_global_target_position = self._get_random_global_target_sample() + + # Debug prints for early iterations + if iteration < 10: + print(f"Iter {iteration}: Global RRT Target: {sampled_global_target_position}") + + + # 3. Extend the chosen RRT tree using the sampled_global_target_position + newly_added_node_to_tree = self._extend_tree( + active_tree_nodes, + active_tree_identifier, + sampled_global_target_position + ) + + if newly_added_node_to_tree: + # 4. Try to connect the new node to the other tree + if self._try_connect_trees(newly_added_node_to_tree, active_tree_identifier): + print(f"Path found and connected at iteration {iteration + 1}!") + break # Exit main planning loop + + if not self.is_path_found: + print(f"Path planning finished. No path found within {self.max_iterations} iterations or time budget.") + + final_path_tuples = None + if self.is_path_found: + final_path_tuples = self._extract_final_path() + + if visualize_planning_output: + self._visualize_bidirectional_rrt_result(final_path_tuples) + + return final_path_tuples + + def _extract_final_path(self): + """Reconstructs the path from start to goal once trees are connected.""" + if not self.is_path_found or not self.tree_connection_nodes: + return None + + # node_at_start_tree_conn_point is the node in the start-tree part of the connection + # node_at_goal_tree_conn_point is the node in the goal-tree part of the connection + node_at_start_tree_conn_point, node_at_goal_tree_conn_point = self.tree_connection_nodes + + # Part 1: Path from actual start to the connection point on the start tree side + path_start_to_connection_tuples = [] + current_node = node_at_start_tree_conn_point + while current_node: + if current_node.path_from_parent: + # path_from_parent is (x,y,theta,v,phi), we need (x,y,theta) + segment = [(p[0], p[1], p[2]) for p in current_node.path_from_parent] + path_start_to_connection_tuples = segment + path_start_to_connection_tuples + elif current_node == self.start_node: # Ensure start node itself is included if no path_from_parent + path_start_to_connection_tuples.insert(0, (current_node.x, current_node.y, current_node.theta)) + current_node = current_node.parent + path_start_to_connection_tuples = self._deduplicate_path_waypoints(path_start_to_connection_tuples) + + + # Part 2: Path from actual goal to the connection point on the goal tree side (then reverse it) + path_goal_to_connection_reversed_tuples = [] + current_node = node_at_goal_tree_conn_point # This node is part of the GOAL tree + while current_node: + if current_node.path_from_parent: + # Path from parent in goal tree is kinematically forward, but angle is inverse. + # So, when reconstructing, convert angles back to world frame. + segment = [(p[0], p[1], self._calculate_inverse_angle(p[2])) for p in current_node.path_from_parent] + path_goal_to_connection_reversed_tuples = segment + path_goal_to_connection_reversed_tuples + elif current_node == self.goal_tree_root_node: + path_goal_to_connection_reversed_tuples.insert(0, (current_node.x, current_node.y, + self._calculate_inverse_angle(current_node.theta))) + current_node = current_node.parent + path_goal_to_connection_reversed_tuples = self._deduplicate_path_waypoints(path_goal_to_connection_reversed_tuples) + + # Reverse the goal-to-connection path to get connection-to-goal path + path_connection_to_goal_tuples = list(reversed(path_goal_to_connection_reversed_tuples)) + + # Combine the two path segments + final_path_tuples = path_start_to_connection_tuples + if path_start_to_connection_tuples and path_connection_to_goal_tuples: + # Check if the last point of start_path and first of goal_path are too close (likely duplicates) + last_of_start = final_path_tuples[-1] + first_of_goal = path_connection_to_goal_tuples[0] + if (abs(last_of_start[0] - first_of_goal[0]) < 0.1 and + abs(last_of_start[1] - first_of_goal[1]) < 0.1 and + abs(normalize_angle(last_of_start[2] - first_of_goal[2])) < math.radians(5)): + final_path_tuples.extend(path_connection_to_goal_tuples[1:]) # Skip duplicate + else: + final_path_tuples.extend(path_connection_to_goal_tuples) + elif path_connection_to_goal_tuples: # If start path was empty (e.g. start IS connection) + final_path_tuples.extend(path_connection_to_goal_tuples) + + return self._smooth_path(final_path_tuples) + + def _smooth_path(self, path): + """ + Fits a cubic smoothing spline through (x,y) that + *always* passes exactly through the first and last points, + while still smoothing the interior subject to your max-curvature constraint. + """ + if len(path) < 3: + return path + + # 1) extract raw waypoints + xs = np.array([p[0] for p in path]) + ys = np.array([p[1] for p in path]) + + # Build a weight vector so that endpoints are “hard” constraints + w = np.ones(len(xs)) + w[0] = w[-1] = self.endpoint_weight + + # 2) compute curvature limit + L = self.vehicle_length + phi_max = self.max_steering_angle_rad + kappa_max = math.tan(phi_max) / L + + # 3) fit with increasing smoothness until curvature is OK + s_val = 0.0 + while True: + # pass our custom weights into splprep + tck, u = splprep([xs, ys], s=s_val, w=w) + u_fine = np.linspace(0, 1, len(path)) + xs_s, ys_s = splev(u_fine, tck) + + # curvature check + dx = np.gradient(xs_s, u_fine) + dy = np.gradient(ys_s, u_fine) + ddx = np.gradient(dx, u_fine) + ddy = np.gradient(dy, u_fine) + curvature = np.abs(dx*ddy - dy*ddx) / (dx*dx + dy*dy)**1.5 + + if np.nanmax(curvature) <= kappa_max or s_val >= self.base_max_s: + break + s_val = max(1e-3, s_val * 2 if s_val>0 else 1.0) + + # Just to be *absolutely* sure, re-enforce exact endpoints: + xs_s[0], ys_s[0] = xs[0], ys[0] + xs_s[-1], ys_s[-1] = xs[-1], ys[-1] + + # 4) reconstruct headings from smoothed trajectory + thetas = np.arctan2( + np.gradient(ys_s, u_fine), + np.gradient(xs_s, u_fine) + ) + + return list(zip(xs_s, ys_s, thetas)) + + def _deduplicate_path_waypoints(self, path_tuples_list): + """Removes consecutive duplicate or very close waypoints from a path list.""" + if not path_tuples_list: + return [] + + deduplicated_path = [path_tuples_list[0]] + for i in range(1, len(path_tuples_list)): + p1_x, p1_y, p1_theta = deduplicated_path[-1] + p2_x, p2_y, p2_theta = path_tuples_list[i] + + position_differs = abs(p1_x - p2_x) >= 1e-3 or abs(p1_y - p2_y) >= 1e-3 + angle_differs = abs(normalize_angle(p1_theta - p2_theta)) >= 1e-3 + + if position_differs or angle_differs: + deduplicated_path.append(path_tuples_list[i]) + return deduplicated_path + + + def _transform_primitive_to_global_frame(self, relative_primitive_path, source_node_global_frame): + """Transforms a relative motion primitive path to the global coordinate frame of source_node.""" + if not relative_primitive_path: + return [] + + transformed_path_segment_states = [] + base_x, base_y, base_theta_rad = source_node_global_frame.x, source_node_global_frame.y, source_node_global_frame.theta + + for i, (rel_x, rel_y, rel_theta, prim_v, prim_phi) in enumerate(relative_primitive_path): + # Rotate + rotated_relative_x = rel_x * math.cos(base_theta_rad) - rel_y * math.sin(base_theta_rad) + rotated_relative_y = rel_x * math.sin(base_theta_rad) + rel_y * math.cos(base_theta_rad) + # Translate + global_x = rotated_relative_x + base_x + global_y = rotated_relative_y + base_y + # Combine angles + global_theta_rad = normalize_angle(rel_theta + base_theta_rad) + + transformed_path_segment_states.append((global_x, global_y, global_theta_rad, prim_v, prim_phi)) + return transformed_path_segment_states + + + def _visualize_bidirectional_rrt_result(self, final_path_tuples): + """Visualizes the RRT trees and the final path.""" + plt.figure(figsize=(13, 13)) + # Plot occupancy grid + plt.imshow( + self.occupancy_grid[::-1], cmap='gray', + extent=[0, self.grid_dimensions[1], 0, self.grid_dimensions[0]] + ) + + # Plot tree edges + tree_configs = [ + (self.nodes_in_start_tree, 'deepskyblue', 'Start Tree Edge'), + (self.nodes_in_goal_tree, 'lightcoral', 'Goal Tree Edge') + ] + for node_list, color_str, label_str in tree_configs: + plotted_label = False + for node in node_list: + if node.parent and node.path_from_parent and len(node.path_from_parent) > 0: + path_xs = [p[0] for p in node.path_from_parent] + path_ys = [p[1] for p in node.path_from_parent] + if not plotted_label: + plt.plot(path_xs, path_ys, color=color_str, alpha=0.5, linewidth=1.5, label=label_str) + plotted_label = True + else: + plt.plot(path_xs, path_ys, color=color_str, alpha=0.5, linewidth=1.5) + + + # Plot Start and Goal nodes + plt.scatter( + self.start_node.x, self.start_node.y, color='lime', s=120, marker='o', + edgecolor='black', label='Start Pose', zorder=5 + ) + plt.arrow( + self.start_node.x, self.start_node.y, + 20.0 * math.cos(self.start_node.theta), 20.0 * math.sin(self.start_node.theta), # Increased arrow length + color='lime', head_width=7.0, head_length=9.0, edgecolor='black', linewidth=1.5, zorder=5 # Increased arrow size + ) + plt.scatter( + self.true_goal_configuration.x, self.true_goal_configuration.y, color='red', s=120, marker='X', + edgecolor='black', label='Goal Pose', zorder=5 + ) + plt.arrow( + self.true_goal_configuration.x, self.true_goal_configuration.y, + 20.0 * math.cos(self.true_goal_configuration.theta), 20.0 * math.sin(self.true_goal_configuration.theta), # Increased arrow length + color='red', head_width=7.0, head_length=9.0, edgecolor='black', linewidth=1.5, zorder=5 # Increased arrow size + ) + + # Plot connection points if path found + if self.tree_connection_nodes: + node_s_conn, node_g_conn = self.tree_connection_nodes + plt.scatter( + node_s_conn.x, node_s_conn.y, color='gold', s=150, marker='*', + edgecolor='black', label='Connection Pt (Start Tree Side)', zorder=6 + ) + plt.scatter( + node_g_conn.x, node_g_conn.y, color='darkorange', s=150, marker='*', # Changed color for distinctness + edgecolor='black', label='Connection Pt (Goal Tree Side)', zorder=6 + ) + + # Plot the final path + if final_path_tuples and len(final_path_tuples) > 1: + path_xs = [p[0] for p in final_path_tuples] + path_ys = [p[1] for p in final_path_tuples] + plt.plot(path_xs, path_ys, color='green', linestyle='-', linewidth=2.0, label='Final Path') + # Plot orientation arrows along the path + for i in range(0, len(final_path_tuples), max(1, len(final_path_tuples) // 25)): # ~25 arrows + plt.arrow( + final_path_tuples[i][0], final_path_tuples[i][1], + 12.0 * math.cos(final_path_tuples[i][2]), 12.0 * math.sin(final_path_tuples[i][2]), # Increased arrow length + color='darkgreen', head_width=3.0, head_length=5.0, alpha=0.8, zorder=4 # Increased arrow size + ) + + plt.title("Bidirectional Kinodynamic RRT* Path Planning Result") + plt.xlabel("X-position (grid units)") + plt.ylabel("Y-position (grid units)") + plt.legend(fontsize='small') + plt.axis('equal') + plt.tight_layout() + + # Save the figure + plt.savefig(f"bidirectional_kinodynamic_rrt_path_{time.time()}.png", dpi=300) + # plt.show() # Uncomment to display plot interactively diff --git a/GEMstack/onboard/planning/mission_planning_example.py b/GEMstack/onboard/planning/mission_planning_example.py new file mode 100644 index 000000000..26051dbd0 --- /dev/null +++ b/GEMstack/onboard/planning/mission_planning_example.py @@ -0,0 +1,47 @@ +from typing import List + +from GEMstack.utils import settings +from ..component import Component +from ...utils import serialization +from ...state import Route,ObjectFrameEnum, AllState, PlannerEnum, MissionPlan, ObjectPose +import os +import numpy as np +import time +import math + +DEBUG = True + +class MissionPlanningExample(Component): + def __init__(self): + self.start_time = time.time() + self.goal_start_pose = None + self.mode = settings.get("run.mode") + pass + + def state_inputs(self): + return ["all"] + + def rate(self): + return 10.0 + + def state_outputs(self)-> List[str]: + return ["mission_plan"] + + def update(self, state: AllState): + if DEBUG: + print("PARKING COMPONENT: Entering RRT mode") + if self.goal_start_pose == None: + self.goal_start_pose = ObjectPose( + frame=ObjectFrameEnum.START, + t=state.start_vehicle_pose.t, + x=state.vehicle.pose.x + 35, + y=state.vehicle.pose.y, + yaw=state.vehicle.pose.yaw + math.pi, + ) + + mission_plan = MissionPlan(PlannerEnum.RRT_STAR, self.goal_start_pose, state.start_vehicle_pose) + + if DEBUG: + print("Planning component update with state:",mission_plan) + + return mission_plan diff --git a/GEMstack/onboard/planning/occupancy_grid.py b/GEMstack/onboard/planning/occupancy_grid.py new file mode 100644 index 000000000..d91a0ab7b --- /dev/null +++ b/GEMstack/onboard/planning/occupancy_grid.py @@ -0,0 +1,421 @@ +from __future__ import print_function + +# Python Headers +import os +import cv2 +import numpy as np + +import rospy +from cv_bridge import CvBridge, CvBridgeError +from sensor_msgs.msg import Image + +image_path = os.path.join(os.path.dirname(__file__), "highbay_image.pgm") +DEBUG = True + +class OccupancyGrid: + + global image_path + + def __init__(self): + + # Read image in BGR format + self.map_image = cv2.imread(image_path) + + # Create the cv_bridge object + self.bridge = CvBridge() + self.map_image_pub = rospy.Publisher("/motion_image", Image, queue_size=1) + self.obs_image_pub = rospy.Publisher("/obstacles", Image, queue_size=1) + # Subscribe information from sensors + self.lat = 0 + self.lon = 0 + self.heading = 0 + + # # Klampt + # self.lat_scale = 80 + # self.lon_scale = 80 + # self.lat_start_bt = -1 + # self.lon_start_l = -1 + + # Real vehicle + self.lat_scale = 0.00062 # 0.0007 + self.lon_scale = 0.00136 # 0.00131 + self.lat_start_bt = 40.092722 # 40.09269 + self.lon_start_l = -88.236365 # -88.23628 + + # Gazebo + # self.lat_scale = 0.000428581 + # self.lon_scale = -0.000267982 + # self.lat_start_bt = -0.0001 # Needs Tuning + # self.lon_start_l = -0.0004 + + self.arrow = 40 + self.img_width = 2107 + self.img_height = 1313 + + def image_to_gnss(self, pix_x, pix_y): + """ + Given image coordinates (pix_x, pix_y), return (latitude, longitude). + Inverse of your: + pix_x = img_width * (lon - lon_start_l) / lon_scale + pix_y = img_height - img_height*(lat - lat_start_bt) / lat_scale + """ + lon = self.lon_start_l + (pix_x / float(self.img_width)) * self.lon_scale + lat = ( + self.lat_start_bt + + ((self.img_height - pix_y) / float(self.img_height)) * self.lat_scale + ) + + return lat, lon + + def image_heading(self, lon_x, lat_y, heading): + + if heading >= 0 and heading < 90: + angle = np.radians(90 - heading) + lon_xd = lon_x + int(self.arrow * np.cos(angle)) + lat_yd = lat_y - int(self.arrow * np.sin(angle)) + + elif heading >= 90 and heading < 180: + angle = np.radians(heading - 90) + lon_xd = lon_x + int(self.arrow * np.cos(angle)) + lat_yd = lat_y + int(self.arrow * np.sin(angle)) + + elif heading >= 180 and heading < 270: + angle = np.radians(270 - heading) + lon_xd = lon_x - int(self.arrow * np.cos(angle)) + lat_yd = lat_y + int(self.arrow * np.sin(angle)) + + else: + angle = np.radians(heading - 270) + lon_xd = lon_x - int(self.arrow * np.cos(angle)) + lat_yd = lat_y - int(self.arrow * np.sin(angle)) + + return lon_xd, lat_yd + + def draw_obstacle_cones(self, cones_gnss_list, cone_cluster_pixel_threshold=100): + """ + Converts GNSS coordinates of obstacle cones to image coordinates, + uses a naive greedy algorithm to cluster nearby cones, draws appropriate + bounding boxes, and publishes the map. + - Isolated cones (or clusters of 1) get a 20x20 buffered box. + - Groups of nearby cones (clusters > 1) get a common bounding box. + + Args: + cones_gnss_list (list): A list of tuples, where each tuple is (lon, lat) + representing an obstacle cone. + cone_cluster_pixel_threshold (int): The maximum distance in pixels between a point + and a cluster for the point to be added to it. + """ + rects = [] + if DEBUG: + print(f"Draw_obstacle_cones called with {len(cones_gnss_list)} cones. Threshold: {cone_cluster_pixel_threshold}px") + if cones_gnss_list: + print(f"Input cones_gnss_list: {cones_gnss_list}") + + pub_image = np.copy(self.map_image) + + if not cones_gnss_list: + if DEBUG: + print("No cones in cones_gnss_list. Publishing original image.") + try: + self.obs_image_pub.publish(self.bridge.cv2_to_imgmsg(pub_image, "bgr8")) + except CvBridgeError as e: # Assuming CvBridgeError and rospy are available + rospy.logerr("CvBridge Error (no cones): {0}".format(e)) + return rects + + valid_image_points = [] + if DEBUG: + print("Processing cones for image conversion...") + for i, (lon, lat) in enumerate(cones_gnss_list): + if DEBUG: + print(f"Cone {i}: lon={lon}, lat={lat}") + lon_x = int(self.img_width * (lon - self.lon_start_l) / self.lon_scale) + lat_y = int( + self.img_height + - self.img_height * (lat - self.lat_start_bt) / self.lat_scale + ) + if DEBUG: + print(f"Cone {i}: Converted to x={lon_x}, y={lat_y}") + + if 0 <= lon_x < self.img_width and 0 <= lat_y < self.img_height: + valid_image_points.append((lon_x, lat_y)) + else: + print(f"Cone {i} ({lon_x},{lat_y}) is INVALID (outside image bounds: w={self.img_width}, h={self.img_height}).") + + if valid_image_points: + print(f"Num of valid_image_points: {len(valid_image_points)}") + + if not valid_image_points: + if DEBUG: + print("No valid cones within image boundaries. Publishing current image.") + try: + self.obs_image_pub.publish(self.bridge.cv2_to_imgmsg(pub_image, "bgr8")) + except CvBridgeError as e: + rospy.logerr("CvBridge Error (no valid cones in image): {0}".format(e)) + return rects + + if len(valid_image_points) == 1: + if DEBUG: + print("Only one valid cone. Drawing single cone box.") + x, y = valid_image_points[0] + buffer = 10 + pt1_x = max(0, x - buffer) + pt1_y = max(0, y - buffer) + pt2_x = min(self.img_width - 1, x + buffer) + pt2_y = min(self.img_height - 1, y + buffer) + + if pt1_x < pt2_x and pt1_y < pt2_y: + cv2.rectangle(pub_image, (pt1_x, pt1_y), (pt2_x, pt2_y), (255, 100, 100), 2) # Light Blue box + rects.append((pt1_x, pt1_y, pt2_x, pt2_y)) + else: + print("Single cone rectangle has zero or negative area, not drawn.") + + elif len(valid_image_points) > 1: + if DEBUG: + print(f"Multiple valid cones ({len(valid_image_points)}). Performing naive greedy clustering.") + + threshold_sq = cone_cluster_pixel_threshold ** 2 # Compare squared distances + + remaining_points_to_cluster = list(valid_image_points) + final_clusters = [] + + while remaining_points_to_cluster: + seed_point = remaining_points_to_cluster.pop(0) + + current_cluster_points_list = [seed_point] + current_cluster_points_set = {seed_point} + + queue = [seed_point] + head_of_queue = 0 + + while head_of_queue < len(queue): + point_to_expand_from = queue[head_of_queue] + head_of_queue += 1 + + # Iterate over remaining_points_to_cluster to find neighbors + # To avoid modifying list while iterating, build a new list for the *next* iteration of remaining_points_to_cluster + next_iteration_remaining_points = [] + for other_point in remaining_points_to_cluster: + dist_sq = (point_to_expand_from[0] - other_point[0])**2 + \ + (point_to_expand_from[1] - other_point[1])**2 + + if dist_sq <= threshold_sq: + # Point is close, add to current cluster and queue if not already processed for this cluster + if other_point not in current_cluster_points_set: + current_cluster_points_list.append(other_point) + current_cluster_points_set.add(other_point) + queue.append(other_point) + # This point is now claimed by the current_cluster, so it won't be in next_iteration_remaining_points + else: + # This point is not close to point_to_expand_from, keep it for future clusters + next_iteration_remaining_points.append(other_point) + + remaining_points_to_cluster = next_iteration_remaining_points # Update list for the next point in queue or next seed + + final_clusters.append(current_cluster_points_list) + if DEBUG: + print(f"Formed cluster (size {len(current_cluster_points_list)}): {current_cluster_points_list}") + if DEBUG: + print(f"Naive clustering resulted in {len(final_clusters)} clusters.") + + # Draw based on final_clusters + for i, cluster_points in enumerate(final_clusters): + if not cluster_points: continue # Should not happen + + if len(cluster_points) == 1: + x, y = cluster_points[0] + buffer = 10 + rect_pt1_x = max(0, x - buffer) + rect_pt1_y = max(0, y - buffer) + rect_pt2_x = min(self.img_width - 1, x + buffer) + rect_pt2_y = min(self.img_height - 1, y + buffer) + + rects.append((rect_pt1_x, rect_pt1_y, rect_pt2_x, rect_pt2_y)) + if rect_pt1_x < rect_pt2_x and rect_pt1_y < rect_pt2_y: + # It's a rectangle with area + cv2.rectangle(pub_image, (rect_pt1_x, rect_pt1_y), (rect_pt2_x, rect_pt2_y), (255, 255, 255), 6) # White + elif rect_pt1_x == rect_pt2_x and rect_pt1_y < rect_pt2_y: + # It's a vertical line + cv2.line(pub_image, (rect_pt1_x, rect_pt1_y), (rect_pt2_x, rect_pt2_y), (255, 255, 255), 6) # White line + elif rect_pt1_y == rect_pt2_y and rect_pt1_x < rect_pt2_x: + # It's a horizontal line + cv2.line(pub_image, (rect_pt1_x, rect_pt1_y), (rect_pt2_x, rect_pt2_y), (255, 255, 255), 6) # White line + elif rect_pt1_x == rect_pt2_x and rect_pt1_y == rect_pt2_y and len(cluster_points) > 0 : + # All points in cluster are the same single point (after clipping or originally) + # This case should ideally be caught by len(cluster_points) == 1, but as a fallback for multi-point clusters collapsing to one point: + cv2.circle(pub_image, (rect_pt1_x, rect_pt1_y), 6, (255,255,255), -1) # Draw a filled circle + else: + print(f"Cluster {i} BBox has zero/negative area and is not a simple line. rect_pt1_x={rect_pt1_x}, rect_pt2_x={rect_pt2_x}, rect_pt1_y={rect_pt1_y}, rect_pt2_y={rect_pt2_y}") + + else: # Cluster has multiple points + all_x_coords = [p[0] for p in cluster_points] + all_y_coords = [p[1] for p in cluster_points] + + min_x = min(all_x_coords) + max_x = max(all_x_coords) + min_y = min(all_y_coords) + max_y = max(all_y_coords) + + rect_pt1_x = max(0, min_x) + rect_pt1_y = max(0, min_y) + rect_pt2_x = min(self.img_width - 1, max_x) + rect_pt2_y = min(self.img_height - 1, max_y) + + rects.append((rect_pt1_x, rect_pt1_y, rect_pt2_x, rect_pt2_y)) + if rect_pt1_x < rect_pt2_x and rect_pt1_y < rect_pt2_y: + # It's a rectangle with area + cv2.rectangle(pub_image, (rect_pt1_x, rect_pt1_y), (rect_pt2_x, rect_pt2_y), (255, 255, 255), 6) # White + elif rect_pt1_x == rect_pt2_x and rect_pt1_y < rect_pt2_y: + # It's a vertical line + cv2.line(pub_image, (rect_pt1_x, rect_pt1_y), (rect_pt2_x, rect_pt2_y), (255, 255, 255), 6) # White line + elif rect_pt1_y == rect_pt2_y and rect_pt1_x < rect_pt2_x: + # It's a horizontal line + cv2.line(pub_image, (rect_pt1_x, rect_pt1_y), (rect_pt2_x, rect_pt2_y), (255, 255, 255), 6) # White line + elif rect_pt1_x == rect_pt2_x and rect_pt1_y == rect_pt2_y and len(cluster_points) > 0 : + # All points in cluster are the same single point (after clipping or originally) + # This case should ideally be caught by len(cluster_points) == 1, but as a fallback for multi-point clusters collapsing to one point: + cv2.circle(pub_image, (rect_pt1_x, rect_pt1_y), 6, (255,255,255), -1) # Draw a filled circle + else: + print(f" Cluster {i} BBox has zero/negative area and is not a simple line. rect_pt1_x={rect_pt1_x}, rect_pt2_x={rect_pt2_x}, rect_pt1_y={rect_pt1_y}, rect_pt2_y={rect_pt2_y}") + + try: + self.obs_image_pub.publish(self.bridge.cv2_to_imgmsg(pub_image, "bgr8")) + except CvBridgeError as e: + rospy.logerr("CvBridge Error (publishing cones): {0}".format(e)) + + cv2.imwrite("occupancy_grid_new.png", pub_image) + return rects + + def compute_cone_rectangles( + self, + cones_gnss_list, + cone_cluster_pixel_threshold=100, + single_buffer=10 + ): + """ + Returns a list of (x1, y1, x2, y2) white-box coordinates for the given cones. + """ + # 1) GNSS → image pixels + valid = [] + for lon,lat in cones_gnss_list: + x = int(self.img_width * (lon - self.lon_start_l) / self.lon_scale) + y = int(self.img_height - self.img_height * (lat - self.lat_start_bt) / self.lat_scale) + if 0 <= x < self.img_width and 0 <= y < self.img_height: + valid.append((x, y)) + + if not valid: + return [] + + # 2) Single-cone case + if len(valid) == 1: + x, y = valid[0] + x1 = max(0, x - single_buffer) + y1 = max(0, y - single_buffer) + x2 = min(self.img_width - 1, x + single_buffer) + y2 = min(self.img_height - 1, y + single_buffer) + return [(x1, y1, x2, y2)] if x1 < x2 and y1 < y2 else [] + + # 3) Greedy clustering + thresh2 = cone_cluster_pixel_threshold ** 2 + pts = valid[:] + clusters = [] + while pts: + seed = pts.pop(0) + cluster = [seed] + queue = [seed] + head = 0 + while head < len(queue): + cx, cy = queue[head]; head += 1 + next_pts = [] + for ox, oy in pts: + if (cx - ox)**2 + (cy - oy)**2 <= thresh2: + cluster.append((ox, oy)) + queue.append((ox, oy)) + else: + next_pts.append((ox, oy)) + pts = next_pts + clusters.append(cluster) + + # 4) Build rectangles + rects = [] + for cl in clusters: + if len(cl) == 1: + x, y = cl[0] + x1 = max(0, x - single_buffer) + y1 = max(0, y - single_buffer) + x2 = min(self.img_width - 1, x + single_buffer) + y2 = min(self.img_height - 1, y + single_buffer) + else: + xs = [p[0] for p in cl]; ys = [p[1] for p in cl] + x1, x2 = max(0, min(xs)), min(self.img_width - 1, max(xs)) + y1, y2 = max(0, min(ys)), min(self.img_height - 1, max(ys)) + if x1 < x2 and y1 < y2: + rects.append((x1, y1, x2, y2)) + return rects + + def gnss_to_image_with_heading(self, lon, lat, heading): + + lon_x = int(self.img_width * (lon - self.lon_start_l) / self.lon_scale) + lat_y = int( + self.img_height + - self.img_height * (lat - self.lat_start_bt) / self.lat_scale + ) + lon_xd, lat_yd = self.image_heading(lon_x, lat_y, heading) + + pub_image = np.copy(self.map_image) + + if ( + lon_x >= 0 + and lon_x <= self.img_width + and lon_xd >= 0 + and lon_xd <= self.img_width + and lat_y >= 0 + and lat_y <= self.img_height + and lat_yd >= 0 + and lat_yd <= self.img_height + ): + cv2.arrowedLine(pub_image, (lon_x, lat_y), (lon_xd, lat_yd), (0, 0, 255), 2) + cv2.circle(pub_image, (lon_x, lat_y), 12, (0, 0, 255), 2) + + if DEBUG: + ## Debug check if you can convert back to GNSS + tmp_lat, tmp_lon = self.image_to_gnss(lon_x, lat_y) + print(f'Lat: {self.lat}, Lon: {self.lon}, Converted Lat: {tmp_lat}, Converted Lon: {tmp_lon}') + ## End Debug check + try: + # Convert OpenCV image to ROS image and publish + self.map_image_pub.publish(self.bridge.cv2_to_imgmsg(pub_image, "bgr8")) + except CvBridgeError as e: + rospy.logerr("CvBridge Error: {0}".format(e)) + + def gnss_to_image(self, lon, lat): + lon_x = int(self.img_width * (lon - self.lon_start_l) / self.lon_scale) + lat_y = int( + self.img_height + - self.img_height * (lat - self.lat_start_bt) / self.lat_scale + ) + pub_image = np.copy(self.map_image) + + if ( + lon_x >= 0 + and lon_x <= self.img_width + and lat_y >= 0 + and lat_y <= self.img_height + ): + cv2.circle(pub_image, (lon_x, lat_y), 12, (0, 0, 255), 2) + + try: + # Convert OpenCV image to ROS image and publish + self.map_image_pub.publish(self.bridge.cv2_to_imgmsg(pub_image, "bgr8")) + except CvBridgeError as e: + rospy.logerr("CvBridge Error: {0}".format(e)) + + def gnss_to_image_coords(self, lon, lat): + + lon_x = int(self.img_width * (lon - self.lon_start_l) / self.lon_scale) + lat_y = int( + self.img_height + - self.img_height * (lat - self.lat_start_bt) / self.lat_scale + ) + if DEBUG: + print(f"GNSS ({lat}, {lon}) → pixel ({lon_x}, {lat_y})") + return lon_x, lat_y diff --git a/GEMstack/onboard/planning/pedestrian_yield_logic.py b/GEMstack/onboard/planning/pedestrian_yield_logic.py new file mode 100644 index 000000000..2b0f3f987 --- /dev/null +++ b/GEMstack/onboard/planning/pedestrian_yield_logic.py @@ -0,0 +1,239 @@ +from ...state import AgentState, AgentEnum, EntityRelation, EntityRelationEnum, ObjectFrameEnum, VehicleState +from ..component import Component +from typing import List, Dict + +import numpy as np + + +DEBUG = False # Set to False to disable debug output + +""" Vehicle Configuration """ +GEM_MODEL = 'e4' # e2 or e4 +buffer_x, buffer_y = 3, 1 +if GEM_MODEL == 'e2': + # e2 axle dimensions, refer to GEMstack/knowledge/vehicle/model/gem_e2.urdf + size_x, size_y = 2.53, 1.35 # measured from base_link.STL + lx_f2r = 0.88 + 0.87 # distance between front axle and rear axle, from gem_e2.urdf wheel_links + ly_axle = 0.6 * 2 # length of axle + l_rear_axle_to_front = 1.28 + 0.87 # measured from base_link.STL + l_rear_axle_to_rear = size_x - l_rear_axle_to_front +elif GEM_MODEL == 'e4': + # e4 axle dimensions, refer to GEMstack/knowledge/vehicle/model/gem_e4.urdf + size_x, size_y = 3.35, 1.35 # measured from base_link.STL + lx_f2r = 1.2746 + 1.2904 # distance between front axle and rear axle, from gem_e4.urdf wheel_links + ly_axle = 0.6545 * 2 # length of front and rear axles, from gem_e4.urdf wheel_links + l_rear_axle_to_front = 1.68 + 1.29 # measured from base_link.STL + l_rear_axle_to_rear = size_x - l_rear_axle_to_front +# add buffer to outer dimensions, defined by half of the x, y buffer area dimensions +buffer_front = l_rear_axle_to_front + buffer_x +buffer_rear = -(l_rear_axle_to_rear + buffer_x) +buffer_left = size_y / 2 + buffer_y +buffer_right = -(size_y / 2 + buffer_y) +# comfortable deceleration for lookahead time calculation +comfort_decel = 2.0 + + +class PedestrianYielder(Component): + """Yields for all pedestrians in the scene. + + Result is stored in the relations graph. + """ + + def rate(self): + return None + + def state_inputs(self): + return ['agents', 'vehicle'] + + def state_outputs(self): + return ['relations'] + + def update(self, agents: Dict[str, AgentState], vehicle: VehicleState) -> List[EntityRelation]: + if DEBUG: + print("PedestrianYielder vehicle pose:", vehicle.pose, vehicle.v) + res = [] + for n, a in agents.items(): + if DEBUG: + print(f"[DEBUG] PedestrianYielder.update: Agent:", a.pose, a.velocity) + + """ collision estimation based on agent states in vehicle frame """ + if a.type == AgentEnum.PEDESTRIAN: + check, t_min, min_dist, pt_min = check_collision_in_vehicle_frame(a, vehicle) + if DEBUG: + print( + f"[DEBUG] ID {n}, relation:{check}, minimum distance:{min_dist}, time to min_dist: {t_min}, point of min_dist:{pt_min}") + # collision may occur, slow down + if check == 'YIELD': + res.append(EntityRelation(type=EntityRelationEnum.YIELDING, obj1='', obj2=n)) + # collision in a short time, emergency stop + elif check == 'STOP': + res.append(EntityRelation(type=EntityRelationEnum.STOPPING_AT, obj1='', obj2=n)) + + return res + + +""" Planning in vehicle frame without waypoints """ +def check_collision_in_vehicle_frame(agent: AgentState, vehicle: VehicleState): + xp, yp = agent.pose.x, agent.pose.y + vx, vy = agent.velocity[:2] + xv = vehicle.pose.x + yv = vehicle.pose.y + yaw = vehicle.pose.yaw + vel = vehicle.v + # time to stop from current velocity with comfortable deceleration + t_look = vel / comfort_decel + # calculate relative pedestrian position and velocity in vehicle frame + if agent.pose.frame == ObjectFrameEnum.CURRENT: + # xp, yp, vx, vy are already in vehicle frame + pass + elif agent.pose.frame == ObjectFrameEnum.START: + # convert xp, yp, vx, vy to vehicle frame + R = np.array([[np.cos(yaw), np.sin(yaw)], [-np.sin(yaw), np.cos(yaw)]], dtype=np.float32) + dx, dy = xp - xv, yp - yv + dvx, dvy = vx - vel * np.cos(yaw), vy - vel * np.sin(yaw) + # pedestrian pose and velocity in vehicle frame + xp, yp = np.dot(R, np.array([dx, dy])) + vx, vy = np.dot(R, np.array([dvx, dvy])) + + # If pedestrian already in buffer area + if buffer_rear <= xp <= buffer_front and buffer_right <= yp <= buffer_left: + return 'STOP', 0, 0, (xp, yp) + t_min, min_dist, pt_min = find_min_distance_and_time(xp, yp, vx, vy) + # if the minimum distance between the position and the buffer area is less than 0, than a collision is expected + if min_dist is not None: + if min_dist <= 0: + if t_min <= 0: + check = 'STOP' + elif t_min <= t_look: + check = 'YIELD' + else: + check = 'RUN' + else: + check = 'RUN' + else: + check = 'RUN' + return check, t_min, min_dist, pt_min + + +def find_min_distance_and_time(xp, yp, vx, vy): + # path function: Ax + By + C = vy * x - vx * y + (yp * vx - xp * vy) = 0 + vx = vx if vx != 0 else 1e-6 + vy = vy if vy != 0 else 1e-6 + A = vy + B = -vx + C = yp * vx - xp * vy + + def point_to_line(x0, y0, A, B, C): + # calculate the shortest distance from a point (x0, y0) to the line Ax + By + C = 0 """ + numerator = abs(A * x0 + B * y0 + C) + denominator = np.sqrt(A ** 2 + B ** 2) + x_foot = x0 - (A * (A * x0 + B * y0 + C)) / denominator + y_foot = y0 - (B * (A * x0 + B * y0 + C)) / denominator + dist = numerator / denominator if denominator != 0 else np.inf + return dist, (x_foot, y_foot) + + def point_dist(x1, y1, x2, y2): + return np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2) + + """ Compute intersections at the front, left and right edge """ + if xp >= buffer_front: + # front edge intersection: x = x_buff = xt = xp + vx * t_f + t_f = (buffer_front - xp) / vx + yt = yp + vy * t_f + if t_f < 0: # object moving away with higher speed than vehicle, start point has minimum distance + t_min = 0 + pt_min = xp, yp + if buffer_right <= yp <= buffer_left: + min_dist = xp - buffer_front # the distance to the front edge + elif yt > buffer_left: + min_dist = point_dist(xp, yp, buffer_front, buffer_left) # the distance to front left corner + else: + min_dist = point_dist(xp, yp, buffer_front, buffer_right) # the distance to front right corner + else: + if buffer_right <= yt <= buffer_left: # intersect at front edge, is collision + t_min = t_f + min_dist = 0 + pt_min = buffer_front, yt + elif yt > buffer_left: # intersect at front left + if yp <= yt: + min_dist, pt_min = point_to_line(buffer_front, buffer_left, A, B, C) + t_min = (pt_min[0] - xp) / vx + else: # left edge interaction: y = y_buff = yt = yp + vy * t_l + t_l = (buffer_left - yp) / vy + xt = xp + vx * t_l + if buffer_rear <= xt <= buffer_front: # intersect at left edge, is collision + t_min = t_l + min_dist = 0 + pt_min = xt, buffer_left + else: + min_dist, pt_min = point_to_line(buffer_rear, buffer_left, A, B, C) + t_min = (pt_min[0] - xp) / vx + else: # intersect at front right + if yp >= yt: + min_dist, pt_min = point_to_line(buffer_front, buffer_right, A, B, C) + t_min = (pt_min[0] - xp) / vx + else: # right edge interaction: y = y_buff = yt = yp + vy * t_l + t_r = (buffer_right - yp) / vy + xt = xp + vx * t_r + if buffer_rear <= xt <= buffer_front: # intersect at right edge, is collision + t_min = t_r + min_dist = 0 + pt_min = xt, buffer_right + else: + min_dist, pt_min = point_to_line(buffer_rear, buffer_right, A, B, C) + t_min = (pt_min[0] - xp) / vx + else: + if yp >= buffer_left: + # left edge interaction: y = y_buff = yt = yp + vy * t_l + t_l = (buffer_left - yp) / vy + xt = xp + vx * t_l + if t_l < 0: # object moving away, start point has minimum distance + t_min = 0 + pt_min = xp, yp + if buffer_rear <= xp <= buffer_front: + min_dist = yp - buffer_left # the distance to the left edge + else: + min_dist = point_dist(xp, yp, buffer_rear, buffer_left) # the distance to rear right corner + else: + if buffer_rear <= xt <= buffer_front: # intersect at left edge, is collision + t_min = t_l + min_dist = 0 + pt_min = xt, buffer_left + elif xt > buffer_front: + min_dist, pt_min = point_to_line(buffer_front, buffer_left, A, B, C) + t_min = (pt_min[0] - xp) / vx + else: + min_dist, pt_min = point_to_line(buffer_rear, buffer_left, A, B, C) + t_min = (pt_min[0] - xp) / vx + elif yp <= buffer_right: + # right edge interaction: y = -y_buff = yt = yp + vy * t_l + t_r = (buffer_right - yp) / vy + xt = xp + vx * t_r + if t_r < 0: # object moving away, start point has minimum distance + t_min = 0 + pt_min = xp, yp + if buffer_rear<= xp <= buffer_front: + min_dist = -yp - buffer_right # the distance to the right edge + else: + min_dist = point_dist(xp, yp, buffer_rear, buffer_right) # the distance to rear right corner + else: + if buffer_rear <= xt <= buffer_front: # intersect at left edge, is collision + t_min = t_r + min_dist = 0 + pt_min = xt, buffer_right + elif xt > buffer_front: + min_dist, pt_min = point_to_line(buffer_front, buffer_right, A, B, C) + t_min = (pt_min[0] - xp) / vx + else: + min_dist, pt_min = point_to_line(buffer_rear, buffer_right, A, B, C) + t_min = (pt_min[0] - xp) / vx + elif xp >= buffer_rear: + t_min = 0 + min_dist = -1 + pt_min = xp, yp + else: # rear position, should not be seen by the front camera + t_min = None + min_dist = None + pt_min = None + + return t_min, min_dist, pt_min diff --git a/GEMstack/onboard/planning/route_planning_component.py b/GEMstack/onboard/planning/planning_route.py similarity index 68% rename from GEMstack/onboard/planning/route_planning_component.py rename to GEMstack/onboard/planning/planning_route.py index ef3bf5c07..8f6d398de 100644 --- a/GEMstack/onboard/planning/route_planning_component.py +++ b/GEMstack/onboard/planning/planning_route.py @@ -1,24 +1,25 @@ import os -from typing import Dict, List +from typing import List import numpy as np + from GEMstack.onboard.component import Component -from GEMstack.state.agent import AgentState from GEMstack.state.mission import MissionEnum from GEMstack.state.all import AllState from GEMstack.state.physical_object import ObjectFrameEnum, ObjectPose from GEMstack.state.route import PlannerEnum, Route, Path from .parking_route_planner import ParkingPlanner from .longitudinal_planning import longitudinal_plan +from GEMstack.utils import settings from GEMstack.state.vehicle import VehicleState +from GEMstack.state.agent import AgentState from GEMstack.state.intent import VehicleIntentEnum -# from .planner import optimized_kinodynamic_rrt_planning -# from .map_utils import load_pgm_to_occupancy_grid -from .rrt_star import RRTStar +from GEMstack.state.mission_plan import MissionPlan, ModeEnum +from GEMstack.state.obstacle import Obstacle, ObstacleMaterialEnum +from .rrt_star_entrypoint import optimized_kinodynamic_rrt_planning from typing import List from ..component import Component -from ...utils import serialization from ...state import Route, ObjectFrameEnum import math import requests @@ -26,13 +27,197 @@ import rospy from sensor_msgs.msg import Image from cv_bridge import CvBridge -# from .occupancy_grid2 import OccupancyGrid2 +from .occupancy_grid import OccupancyGrid import cv2 +###################################################################### +#################### PLANNING HORIZONTAL ############################# +###################################################################### +class PlanningRoutePlanner(Component): + """Reads a route from disk and returns it as the desired route.""" + def __init__(self): + print("Route Planning Component init") + self.planner = None + self.route = None + self.bridge = CvBridge() + self.occupancy_grid = OccupancyGrid() + self.img_pub = rospy.Publisher("/occupancy_grid", Image, queue_size=1) + self.previous_obstacles = None + self.frame = None + self.mode = settings.get("run.mode") + + def state_inputs(self): + return ["vehicle", "mission_plan", "obstacles"] + + def state_outputs(self) -> List[str]: + return ['route'] + + def rate(self): + return 10.0 + + + def update(self, vehicle: VehicleState, mission_plan: MissionPlan, obstacles: Obstacle) -> Route: + + if self.frame is None: + if self.mode == ModeEnum.HARDWARE: + self.frame = ObjectFrameEnum.GLOBAL + else: #simulation + self.frame = ObjectFrameEnum.ABSOLUTE_CARTESIAN + + if self.previous_obstacles is None: + self.previous_obstacles = len(obstacles) + + # Convert vehicle pose to global frame + vehicle_global_pos = vehicle.pose.to_frame(self.frame, start_pose_abs=mission_plan.start_vehicle_pose) + + # Convert vehicle pose to image coordinates + self.occupancy_grid.gnss_to_image(vehicle_global_pos.x, vehicle_global_pos.y) + + obstacles_global_poses = [] + if DEBUG: + print("Number of Detected Obstacles: ", len(obstacles)) + for n, o in obstacles.items(): + print("Obstacle: ", o) + if o.material == ObstacleMaterialEnum.TRAFFIC_CONE: + obstacle_global_pose = o.pose.to_frame(self.frame, start_pose_abs=mission_plan.start_vehicle_pose) + obstacles_global_poses.append((obstacle_global_pose.x, obstacle_global_pose.y)) + + + rects = self.occupancy_grid.draw_obstacle_cones(obstacles_global_poses) + + if DEBUG: + print("Route Planner's mission:", mission_plan.planner_type.value) + print("type of mission plan:", type(PlannerEnum.RRT_STAR)) + print("Vehicle x:", vehicle.pose.x) + print("Vehicle y:", vehicle.pose.y) + print("Vehicle yaw:", vehicle.pose.yaw) + + if mission_plan.planner_type.value == PlannerEnum.PARKING.value: + if DEBUG: + print("Route Planning in PARKING mode") + base_path = os.path.dirname(__file__) + file_path = os.path.join(base_path, "../../knowledge/routes/forward_15m_extra.csv") + waypoints = np.loadtxt(file_path, delimiter=',', dtype=float) + if waypoints.shape[1] == 3: + waypoints = waypoints[:,:2] + self.route = Route(frame=ObjectFrameEnum.START,points=waypoints.tolist()) + elif mission_plan.planner_type.value == PlannerEnum.RRT_STAR.value: + if DEBUG: + print("Route Planning in RRT mode") + + ## Step 1: Convert vehicle pose to global frame + vehicle_global_pose = vehicle.pose.to_frame( + self.frame, start_pose_abs=mission_plan.start_vehicle_pose + ) + + # Plot motion image + self.occupancy_grid.gnss_to_image_with_heading( + vehicle_global_pose.x, vehicle_global_pose.y, vehicle_global_pose.yaw + ) + ## Step 2: Get start image coordinates aka position of vehicle in image + start_x, start_y = self.occupancy_grid.gnss_to_image_coords( + vehicle_global_pose.x, vehicle_global_pose.y + ) + + start_yaw = vehicle.pose.yaw + math.pi + if DEBUG: + print("Start image coordinates", start_x, start_y, "yaw", start_yaw) + + ## Step 3. Convert goal to global frame + goal_global_pose = mission_plan.goal_vehicle_pose.to_frame( + self.frame, start_pose_abs=mission_plan.start_vehicle_pose + ) + + goal_x, goal_y = self.occupancy_grid.gnss_to_image_coords( + goal_global_pose.x, goal_global_pose.y + ) + + goal_yaw = start_yaw + if DEBUG: + print("Goal image coordinates", goal_x, goal_y, "yaw", goal_yaw) + + script_dir = os.path.dirname(os.path.abspath(__file__)) + map_path = os.path.join(script_dir, "highbay_image.pgm") + map_img = cv2.imread(map_path, cv2.IMREAD_UNCHANGED) + + for i, (rect_pt1_x, rect_pt1_y, rect_pt2_x, rect_pt2_y) in enumerate(rects): + if rect_pt1_x < rect_pt2_x and rect_pt1_y < rect_pt2_y: + # It's a rectangle with areaf + cv2.rectangle(map_img, (rect_pt1_x, rect_pt1_y), (rect_pt2_x, rect_pt2_y), (255, 255, 255), 6) # White + if DEBUG: + print(f"[DEBUG-ME] Drew WHITE rectangle for cluster {i}") + elif rect_pt1_x == rect_pt2_x and rect_pt1_y < rect_pt2_y: + # It's a vertical line + cv2.line(map_img, (rect_pt1_x, rect_pt1_y), (rect_pt2_x, rect_pt2_y), (255, 255, 255), 6) # White line + if DEBUG: + print(f"[DEBUG-ME] Drew WHITE vertical line for cluster {i}") + elif rect_pt1_y == rect_pt2_y and rect_pt1_x < rect_pt2_x: + # It's a horizontal line + cv2.line(map_img, (rect_pt1_x, rect_pt1_y), (rect_pt2_x, rect_pt2_y), (255, 255, 255), 6) # White line + if DEBUG: + print(f"[DEBUG-ME] Drew WHITE horizontal line for cluster {i}") + else: + if DEBUG: + print(f"[DEBUG-ME] Cluster {i} BBox has zero/negative area and is not a simple line. rect_pt1_x={rect_pt1_x}, rect_pt2_x={rect_pt2_x}, rect_pt1_y={rect_pt1_y}, rect_pt2_y={rect_pt2_y}") + + cv2.imwrite("highbay_image_with_cones.pgm", map_img) + occupancy_grid = (map_img > 0).astype( + np.uint8 + ) + cv2.imwrite("occupancy_grid_after_>0.pgm", occupancy_grid * 255) + self.t_last = None + self.bounds = (0, occupancy_grid.shape[1]) + start_w = [start_x, start_y, start_yaw] + goal_w = [goal_x, goal_y, goal_yaw] + + if self.route == None or len(obstacles) != self.previous_obstacles: + + self.previous_obstacles = len(obstacles) + path = optimized_kinodynamic_rrt_planning(start_w, goal_w, occupancy_grid) + waypoints = [] + for i in range(len(path)): + x, y, theta = path[i] + waypoints = [] + for i in range(0, len(path), 10): + x, y, theta = path[i] + # Converts pixel to global frame. + waypoint_lat, waypoint_lon = self.occupancy_grid.image_to_gnss(x, y) + + # Convert global to start frame + waypoint_global_pose = ObjectPose( + frame=self.frame, + t=mission_plan.start_vehicle_pose.t, + x=waypoint_lon, + y=waypoint_lat, + yaw=theta, + ) + waypoint_start_pose = waypoint_global_pose.to_frame( + ObjectFrameEnum.START, start_pose_abs=mission_plan.start_vehicle_pose + ) + waypoints.append((waypoint_start_pose.x, waypoint_start_pose.y)) + + self.route = Route( + frame=ObjectFrameEnum.START, points=waypoints) + if DEBUG: + print("Route points in start frame: ", waypoints) + return self.route + + else: + print("Unknown mode") + + return self.route + + + +###################################################################### +#################### INSPECTION VERTICAL ############################# +###################################################################### + # Constants for planning ORIGIN_PX = (190, 80) SCALE_PX_PER_M = 6.5 +DEBUG = True # Functions to dynamically calculate a circular or linear path around the inspection area @@ -252,7 +437,7 @@ def __init__(self, state_machine, frame: str = "start"): self.start = [0, 0] self.bridge = CvBridge() self.img_pub = rospy.Publisher("/image_with_car_xy", Image, queue_size=1) - self.occupancy_grid = OccupancyGrid2() + self.occupancy_grid = OccupancyGrid() self.planned_path_already = False self.x = None diff --git a/GEMstack/onboard/planning/planning_trajectory.py b/GEMstack/onboard/planning/planning_trajectory.py new file mode 100644 index 000000000..9ea82d3cf --- /dev/null +++ b/GEMstack/onboard/planning/planning_trajectory.py @@ -0,0 +1,690 @@ +from typing import List, Tuple +import math +from ..component import Component +from ...state import AllState, VehicleState, EntityRelation, EntityRelationEnum, Path, Trajectory, Route, ObjectFrameEnum +from ...utils import serialization, settings +from ...mathutils import transforms +import numpy as np + +DEBUG = False # Set to False to disable debug output + +def scurve(x): + x = np.clip(x, 0.0, 1.0) + return 6 * x**5 - 15 * x**4 + 10 * x**3 + +def jerk_limited_brake(v0: float, a_max: float, j_max: float, dt: float = 0.01): + tj = a_max / j_max + T_flat = (v0 - a_max * tj) / a_max if v0 > a_max * tj else 0.0 + T = 2 * tj + T_flat + + times = [0.0] + velocities = [v0] + positions = [0.0] + t, v, s = 0.0, v0, 0.0 + while v > 0: + if t < tj: + a = -j_max * t + elif t < tj + T_flat: + a = -a_max + elif t < T: + t3 = t - (T - tj) + a = -a_max + j_max * t3 + else: + a = 0.0 + v_next = max(0.0, v + a * dt) + s_next = s + 0.5 * (v + v_next) * dt + t += dt + times.append(t) + velocities.append(v_next) + positions.append(s_next) + v, s = v_next, s_next + return times, velocities, positions + +def generate_dense_points(points: List[Tuple[float, float]], density: int = 10) -> List[Tuple[float, float]]: + if not points: + return [] + if len(points) == 1: + return points.copy() + + dense_points = [points[0]] + for i in range(len(points) - 1): + p0 = points[i] + p1 = points[i + 1] + dx = p1[0] - p0[0] + dy = p1[1] - p0[1] + seg_length = math.hypot(dx, dy) + n_interp = int(round(seg_length * density)) + for j in range(1, n_interp + 1): + fraction = j / (n_interp + 1) + x_interp = p0[0] + fraction * dx + y_interp = p0[1] + fraction * dy + dense_points.append((x_interp, y_interp)) + dense_points.append(p1) + return dense_points + +def compute_cumulative_distances(points: List[List[float]]) -> List[float]: + s_vals = [0.0] + for i in range(1, len(points)): + dx = points[i][0] - points[i - 1][0] + dy = points[i][1] - points[i - 1][1] + ds = math.hypot(dx, dy) + s_vals.append(s_vals[-1] + ds) + + if DEBUG: + print("[DEBUG] compute_cumulative_distances: s_vals =", s_vals) + return s_vals + +def longitudinal_plan(path, acceleration, deceleration, max_speed, current_speed, planner_type="standard"): + path_normalized = path.arc_length_parameterize() + points = list(path_normalized.points) + dense_points = generate_dense_points(points) + s_vals = compute_cumulative_distances(dense_points) + L = s_vals[-1] # Total path length + stopping_distance = (current_speed ** 2) / (2 * deceleration) + + if DEBUG: + print("[DEBUG] compute_cumulative_distances: s_vals =", s_vals) + print("[DEBUG] longitudinal_plan: Total path length L =", L) + print("[DEBUG] longitudinal_plan: Braking distance needed =", stopping_distance) + + if stopping_distance > L: # Case where there is not enough stopping distance to stop before path ends (calls emergency brake) + return longitudinal_brake(path, deceleration, current_speed, planner_type=planner_type) + + if current_speed > max_speed: # Case where car is exceeding the max speed so we need to slow down (do initial slowdown) + if planner_type == "scurve": + print("IN CURRENT SPEED > MAX SPEED CASE") + + if DEBUG: + print(f"[DEBUG] Handling case where current_speed ({current_speed:.2f}) > max_speed ({max_speed:.2f})") + + # PHASE 1: Initial deceleration from current_speed to max_speed + s_decel1 = (current_speed ** 2 - max_speed ** 2) / (2 * deceleration) + t_decel1 = (current_speed - max_speed) / deceleration + + # PHASE 3: Final deceleration from max_speed to 0 + s_decel3 = (max_speed ** 2) / (2 * deceleration) + t_decel3 = max_speed / deceleration + + # PHASE 2: Cruise at max_speed for remaining path + s_cruise = max(0.0, L - s_decel1 - s_decel3) + t_cruise = s_cruise / max_speed if max_speed > 0 else 0.0 + + if planner_type == "standard": + # Initial deceleration phase to reach max_speed + initial_decel_distance = (current_speed ** 2 - max_speed ** 2) / (2 * deceleration) + initial_decel_time = (current_speed - max_speed) / deceleration + remaining_distance = L - initial_decel_distance + + if DEBUG: + print(f"[DEBUG] Phase 1 - Initial Decel: distance = {initial_decel_distance:.2f}, time = {initial_decel_time:.2f}") + print(f"[DEBUG] Remaining distance after reaching max_speed: {remaining_distance:.2f}") + + # Calculate final deceleration distance needed to stop from max_speed + final_decel_distance = (max_speed ** 2) / (2 * deceleration) + cruise_distance = remaining_distance - final_decel_distance + + if DEBUG: + print(f"[DEBUG] Phase 2 - Cruise: distance = {cruise_distance:.2f}") + print(f"[DEBUG] Phase 3 - Final Decel: distance = {final_decel_distance:.2f}") + + times = [] + for s in s_vals: + if s <= initial_decel_distance: # Phase 1: Initial deceleration to max_speed + v = math.sqrt(current_speed ** 2 - 2 * deceleration * s) + t = (current_speed - v) / deceleration + if DEBUG: # Print every 10m + print(f"[DEBUG] Initial Decel: s = {s:.2f}, v = {v:.2f}, t = {t:.2f}") + elif s <= initial_decel_distance + cruise_distance: # Phase 2: Cruise at max_speed + s_in_cruise = s - initial_decel_distance + t = initial_decel_time + s_in_cruise / max_speed + if DEBUG: # Print every 10m + print(f"[DEBUG] Cruise: s = {s:.2f}, v = {max_speed:.2f}, t = {t:.2f}") + else: # Phase 3: Final deceleration to stop + s_in_final_decel = s - (initial_decel_distance + cruise_distance) + v = math.sqrt(max(max_speed ** 2 - 2 * deceleration * s_in_final_decel, 0.0)) + t = initial_decel_time + cruise_distance / max_speed + (max_speed - v) / deceleration + if DEBUG: # Print every 10m + print(f"[DEBUG] Final Decel: s = {s:.2f}, v = {v:.2f}, t = {t:.2f}") + times.append(t) + + if DEBUG: + print("[DEBUG] Trajectory complete: Three phases executed") + print(f"[DEBUG] Total time: {times[-1]:.2f}") + + return Trajectory(frame=path.frame, points=dense_points, times=times) + + else: # scurve planner + times = [] + t = 0.0 + prev_s = 0.0 + for s in s_vals: + ds = s - prev_s + if s <= s_decel1: + # Phase 1: Decel from current_speed to max_speed + ratio = s / s_decel1 if s_decel1 > 0 else 1.0 + v = current_speed + (max_speed - current_speed) * scurve(ratio) + elif s <= s_decel1 + s_cruise: + # Phase 2: Cruise at max speed + v = max_speed + elif s <= s_decel1 + s_cruise + s_decel3: + s_in_final = s - (s_decel1 + s_cruise) + ratio = s_in_final / s_decel3 if s_decel3 > 0 else 1.0 + v = max_speed * (1.0 - scurve(ratio)) + else: + if DEBUG: + print("[DEBUG] Trajectory complete: Three phases executed") + print(f"[DEBUG] Total time: {times[-1]:.2f}") + return Trajectory(frame=path.frame, points=dense_points[:len(times)], times=times) + + v = max(1e-3, v) + dt = ds / v + t += dt + times.append(t) + prev_s = s + + return Trajectory(frame=path.frame, points=dense_points[:len(times)], times=times) + + if acceleration <= 0: + if DEBUG: + print(f"[DEBUG] No acceleration allowed. Current speed: {current_speed:.2f}") + # Pure deceleration phase + s_decel = (current_speed ** 2) / (2 * deceleration) + T_decel = current_speed / deceleration + if DEBUG: + print(f"[DEBUG] Will maintain speed until s_decel: {s_decel:.2f}") + print(f"[DEBUG] Total deceleration time will be: {T_decel:.2f}") + times = [] + for s in s_vals: + if s <= L - s_decel: # Maintain current speed until deceleration point + t_point = s / current_speed + if DEBUG: + print(f"[DEBUG] Constant Speed Phase: s = {s:.2f}, v = {current_speed:.2f}, t = {t_point:.2f}") + else: # Deceleration phase + s_in_decel = s - (L - s_decel) + v = math.sqrt(max(current_speed ** 2 - 2 * deceleration * s_in_decel, 0.0)) + t_point = (L - s_decel) / current_speed + (current_speed - v) / deceleration + if DEBUG: + print(f"[DEBUG] Deceleration Phase: s = {s:.2f}, v = {v:.2f}, t = {t_point:.2f}") + times.append(t_point) + return Trajectory(frame=path.frame, points=dense_points, times=times) + + # Determine max possible peak speed given distance + v_peak_possible = math.sqrt( + (2 * acceleration * deceleration * L + deceleration * current_speed ** 2) / (acceleration + deceleration)) + v_target = min(max_speed, v_peak_possible) + + if DEBUG: + print("[DEBUG] longitudinal_plan: v_peak_possible =", v_peak_possible, "v_target =", v_target) + + # Compute acceleration phase + s_accel = max(0.0, (v_target ** 2 - current_speed ** 2) / (2 * acceleration)) + t_accel = max(0.0, (v_target - current_speed) / acceleration) + + # Compute deceleration phase + s_decel = max(0.0, (v_target ** 2) / (2 * deceleration)) + t_decel = max(0.0, v_target / deceleration) + + # Compute cruise phase + s_cruise = max(0.0, L - s_accel - s_decel) + t_cruise = s_cruise / v_target if v_target > 0 else 0.0 + + if DEBUG: + print("[DEBUG] longitudinal_plan: s_accel =", s_accel, "t_accel =", t_accel) + print("[DEBUG] longitudinal_plan: s_decel =", s_decel, "t_decel =", t_decel) + print("[DEBUG] longitudinal_plan: s_cruise =", s_cruise, "t_cruise =", t_cruise) + + if planner_type == "standard": + times = [] + for s in s_vals: + if s <= s_accel: # Acceleration phase + v = math.sqrt(current_speed ** 2 + 2 * acceleration * s) + t_point = (v - current_speed) / acceleration + if DEBUG: + print(f"[DEBUG] Acceleration Phase: s = {s:.2f}, v = {v:.2f}, t = {t_point:.2f}") + elif s <= s_accel + s_cruise: # Cruise phase + t_point = t_accel + (s - s_accel) / v_target + if DEBUG: + print(f"[DEBUG] Cruise Phase: s = {s:.2f}, t = {t_point:.2f}") + else: # Deceleration phase + s_decel_phase = s - s_accel - s_cruise + v_decel = math.sqrt(max(v_target ** 2 - 2 * deceleration * s_decel_phase, 0.0)) + t_point = t_accel + t_cruise + (v_target - v_decel) / deceleration + if t_point < times[-1]: # Ensure time always increases + t_point = times[-1] + 0.01 # Small time correction step + if DEBUG: + print(f"[DEBUG] Deceleration Phase: s = {s:.2f}, v = {v_decel:.2f}, t = {t_point:.2f}") + times.append(t_point) + + if DEBUG: + print("[DEBUG] longitudinal_plan: Final times =", times) + + return Trajectory(frame=path.frame, points=dense_points, times=times) + + else: # scurve planner + times = [] + t = 0.0 + prev_s = 0.0 + beyond_stop = False + + for s in s_vals: + ds = s - prev_s + if s <= s_accel: # Acceleration phase + ratio = s / s_accel if s_accel > 0 else 1.0 + v = current_speed + (v_target - current_speed) * scurve(ratio) + if DEBUG: + print(f"[DEBUG] Acceleration Phase: s = {s:.2f}, v = {v:.2f}") + elif s <= s_accel + s_cruise: # Cruise phase + v = v_target + if DEBUG: + print(f"[DEBUG] Cruise Phase: s = {s:.2f}") + elif s <= s_accel + s_cruise + s_decel: # Deceleration phase + s_decel_phase = s - s_accel - s_cruise + ratio = s_decel_phase / s_decel if s_decel > 0 else 1.0 + v = v_target * (1.0 - scurve(ratio)) + if DEBUG: + print(f"[DEBUG] Deceleration Phase: s = {s:.2f}, v = {v:.2f}") + else: + beyond_stop = True + + v = max(1e-3, v) + if beyond_stop: + times.append(times[-1] + 0.01) + else: + dt = ds / v + t += dt + times.append(t) + prev_s = s + + if DEBUG: + print("[DEBUG] longitudinal_plan: Final times =", times) + + return Trajectory(frame=path.frame, points=dense_points, times=times) + +def longitudinal_brake(path: Path, deceleration: float, current_speed: float, emergency_decel: float = 8.0, planner_type="standard"): + # Vehicle already stopped - maintain position + if current_speed <= 0: + print("[DEBUG] longitudinal_brake: Zero velocity case! ", [path.points[0]] * len(path.points)) + return Trajectory( + frame=path.frame, + points=[path.points[0]] * len(path.points), + times=[float(i) for i in range(len(path.points))] + ) + + # Get total path length + path_length = sum( + np.linalg.norm(np.array(path.points[i + 1]) - np.array(path.points[i])) + for i in range(len(path.points) - 1) + ) + + # Calculate stopping distance with normal deceleration + T_stop_normal = current_speed / deceleration + s_stop_normal = current_speed * T_stop_normal - 0.5 * deceleration * (T_stop_normal ** 2) + + # Check if emergency braking is needed + if s_stop_normal > path_length: + if DEBUG: + print("[DEBUG] longitudinal_brake: Emergency braking needed!") + print(f"[DEBUG] longitudinal_brake: Normal stopping distance: {s_stop_normal:.2f}m") + print(f"[DEBUG] longitudinal_brake: Available distance: {path_length:.2f}m") + + decel_to_use = emergency_decel + else: + if DEBUG: + print("[DEBUG] longitudinal_brake: Normal braking sufficient") + decel_to_use = deceleration + + T_stop = current_speed / decel_to_use + s_stop = current_speed * T_stop - 0.5 * decel_to_use * (T_stop ** 2) + + if DEBUG: + if planner_type == "standard": + print(f"[DEBUG] longitudinal_brake: Emergency stopping distance: {s_stop:.2f}m") + print(f"[DEBUG] longitudinal_brake: Emergency stopping time: {T_stop:.2f}s") + else: + print(f"[DEBUG] Using deceleration = {decel_to_use:.2f}, time to stop = {T_stop:.2f}s, distance = {s_stop:.2f}m") + + # Generate time points (use more points for smoother trajectory) + num_points = max(len(path.points), 50) + times = np.linspace(0, T_stop, num_points) + + if planner_type == "scurve": + velocities = [current_speed * (1.0 - scurve(t/T_stop)) for t in times] + positions = [] + s = 0.0 + for i in range(1, num_points): + dt = times[i] - times[i-1] + v_avg = 0.5 * (velocities[i] + velocities[i - 1]) + s += v_avg * dt + positions.append(s) + + points = [] + for d in positions: + d_clamped = min(d, path_length) + points.append(path.eval(d_clamped)) + + times = times[1:] + return Trajectory(frame=path.frame, points=points, times=times) + else: + # Calculate distances at each time point using physics equation + distances = current_speed * times - 0.5 * decel_to_use * (times ** 2) + + # Generate points along the path + points = [] + for d in distances: + if d <= path_length: + points.append(path.eval(d)) + else: + points.append(path.eval(d)) + + if DEBUG: + print(f"[DEBUG] longitudinal_brake: Using deceleration of {decel_to_use:.2f} m/s²") + print(f"[DEBUG] longitudinal_brake: Final stopping time: {T_stop:.2f}s") + + return Trajectory(frame=path.frame, points=points, times=times.tolist()) + +class QuinticHermiteSplinePlanner: + """ + Core quintic-Hermite engine: given coarse 2D or 3D waypoints + (x,y[,heading]) builds a C2-continuous spline and samples it at fixed Δt. + """ + def __init__(self, v_des: float = 1.0, dt: float = 0.02): + self.v_des = v_des + self.dt = dt + + def _compute_headings(self, pts: np.ndarray) -> np.ndarray: + """ + If pts.shape[1] == 3, assume pts[:,2] already contains heading ψ. + Otherwise fall back to finite-difference approximation. + """ + n, d = pts.shape + if d == 3: + # user-provided headings + return pts[:, 2].copy() + # approximate by central differences + headings = np.zeros(n) + for i in range(n): + if i == 0: + delta = pts[1] - pts[0] + elif i == n - 1: + delta = pts[-1] - pts[-2] + else: + delta = pts[i+1] - pts[i-1] + headings[i] = np.arctan2(delta[1], delta[0]) + return headings + + def build(self, + waypoints: List[List[float]] + ) -> Tuple[np.ndarray, np.ndarray]: + """ + waypoints: list of [x,y] or [x,y,ψ] entries + returns (pts_out, t_out), each as an np.ndarray + """ + W = np.array(waypoints, float) # shape = (n,2) or (n,3) + headings = self._compute_headings(W) + tangents = np.stack([np.cos(headings), + np.sin(headings)], axis=1) * self.v_des + pts_out = [] + t_out = [] + t_accum = 0.0 + M = np.array([[1, 1, 1], + [3, 4, 5], + [6, 12, 20]], float) + # build one quintic segment between each adjacent pair + for i in range(len(W) - 1): + p0, p1 = W[i,0:2], W[i+1,0:2] + m0, m1 = tangents[i], tangents[i+1] + L = np.linalg.norm(p1 - p0) + T = (L / self.v_des) if self.v_des > 0 else 0.0 + # Hermite coefficients a0..a5 + a0 = p0 + a1 = m0 * T + a2 = np.zeros(2) + RHS = np.vstack([ + p1 - (a0 + a1 + a2), + m1 * T - ( a1 + 2*a2), + np.zeros(2) - ( 2*a2) + ]) # shape = (3,2) + # solve for a3,a4,a5 + a3, a4, a5 = np.linalg.solve(M, RHS) + # sample + if T > 0: + t_samples = np.arange(0.0, T, self.dt) + else: + t_samples = np.array([0.0]) + for tt in t_samples: + s = tt / T if T > 0 else 0.0 + p = (a0 + + a1 * s + + a2 * s**2 + + a3 * s**3 + + a4 * s**4 + + a5 * s**5) + pts_out.append(p.tolist()) + t_out.append(t_accum + tt) + t_accum += T + # append very last waypoint + pts_out.append(W[-1,0:2].tolist()) + t_out.append(t_accum) + return np.array(pts_out), np.array(t_out) + + +class LongitudinalPlanner(Component): + """Follows the given route. Brakes if the ego-vehicle must yield + (e.g. to a pedestrian) or if the end of the route is near; otherwise, + it accelerates (or cruises) toward a desired speed. + """ + + def __init__(self, accelaration, desired_speed, deceleration, emergency_brake): + self.route_progress = None + self.t_last = None + self.acceleration = accelaration + self.desired_speed = desired_speed + self.deceleration = deceleration + self.emergency_brake = emergency_brake + + def state_inputs(self): + return ['all'] + + def state_outputs(self) -> List[str]: + return ['trajectory'] + + def rate(self): + return 10.0 + + def update(self, state: AllState): + vehicle = state.vehicle # type: VehicleState + route = state.route # type: Route + t = state.t + if DEBUG: + print("[DEBUG] LongitudinalPlanner.update: t =", t) + if self.t_last is None: + self.t_last = t + dt = t - self.t_last + if DEBUG: + print("[DEBUG] LongitudinalPlanner.update: dt =", dt) + curr_x = vehicle.pose.x + curr_y = vehicle.pose.y + curr_v = vehicle.v + if DEBUG: + print(f"[DEBUG] LongitudinalPlanner.update: Vehicle position = ({curr_x}, {curr_y}), speed = {curr_v}, ") + # Determine progress along the route. + if self.route_progress is None: + self.route_progress = 0.0 + _, closest_parameter = route.closest_point_local( + [curr_x, curr_y], + (self.route_progress - 5.0, self.route_progress + 5.0) + ) + if DEBUG: + print("[DEBUG] LongitudinalPlanner.update: Closest parameter on route =", closest_parameter) + self.route_progress = closest_parameter + # Extract a 10 m segment of the route for planning lookahead. + route_with_lookahead = route.trim(closest_parameter, closest_parameter + 10.0) + if DEBUG: + print("[DEBUG] LongitudinalPlanner.update: Route Lookahead =", route_with_lookahead) + print("[DEBUG] state", state.relations) + # Check whether any yield relations (e.g. due to pedestrians) require braking. + stay_braking = False + pointSet = set() + for i in range(len(route_with_lookahead.points)): + if tuple(route_with_lookahead.points[i]) in pointSet: + stay_braking = True + break + pointSet.add(tuple(route_with_lookahead.points[i])) + should_brake = any( + r.type == EntityRelationEnum.STOPPING_AT and r.obj1 == '' + for r in state.relations + ) + should_decelerate = any( + r.type == EntityRelationEnum.YIELDING and r.obj1 == '' + for r in state.relations + ) if should_brake == False else False + should_accelerate = (not should_brake and not should_decelerate and curr_v < self.desired_speed) + if DEBUG: + print("[DEBUG] LongitudinalPlanner.update: stay_braking =", stay_braking) + print("[DEBUG] LongitudinalPlanner.update: should_brake =", should_brake) + print("[DEBUG] LongitudinalPlanner.update: should_accelerate =", should_accelerate) + print("[DEBUG] LongitudinalPlanner.update: should_decelerate =", should_decelerate) + if stay_braking: + traj = longitudinal_brake(route_with_lookahead, 0.0, 0.0, 0.0, planner_type="standard") + if DEBUG: + print("[DEBUG] LongitudinalPlanner.update: Using longitudinal_brake (stay braking).") + elif should_brake: + traj = longitudinal_brake(route_with_lookahead, self.emergency_brake, curr_v, planner_type="standard") + if DEBUG: + print("[DEBUG] LongitudinalPlanner.update: Using longitudinal_brake.") + elif should_decelerate: + traj = longitudinal_brake(route_with_lookahead, self.deceleration, curr_v, planner_type="standard") + if DEBUG: + print("[DEBUG] LongitudinalPlanner.update: Using longitudinal_brake.") + elif should_accelerate: + traj = longitudinal_plan(route_with_lookahead, self.acceleration, + self.deceleration, self.desired_speed, curr_v, planner_type="standard") + if DEBUG: + print("[DEBUG] LongitudinalPlanner.update: Using longitudinal_plan (accelerate).") + else: + # Maintain current speed if not accelerating or braking. + traj = longitudinal_plan(route_with_lookahead, 0.0, self.deceleration, self.desired_speed, curr_v, planner_type="standard") + if DEBUG: + print( + "[DEBUG] LongitudinalPlanner.update: Maintaining current speed with longitudinal_plan (0 accel).") + self.t_last = t + if DEBUG: + print('[DEBUG] Current Velocity of the Car: LOOK!', curr_v, self.desired_speed) + print("[DEBUG] LongitudinalPlanner.update: Returning trajectory with", len(traj.points), "points.") + return traj + +class QuinticSplineScurveTrajectoryPlanner(Component): + """Follows the given route. Brakes if the ego-vehicle must yield + (e.g. to a pedestrian) or if the end of the route is near; otherwise, + it accelerates (or cruises) toward a desired speed. + """ + def __init__(self, accelaration, desired_speed, deceleration, emergency_brake): + self.route_progress = None + self.t_last = None + self.acceleration = accelaration + self.desired_speed = desired_speed + self.deceleration = deceleration + self.emergency_brake = emergency_brake + self.lookahead_dist = 10.0 + self.spline_dt = 0.02 + self._spline = QuinticHermiteSplinePlanner(v_des=self.desired_speed, dt=self.spline_dt) + + def state_inputs(self): + return ['all'] + + def state_outputs(self) -> List[str]: + return ['trajectory'] + + # Technically doesn't need to be specified since base class has same implementation + def rate(self): + return 10.0 + + def update(self, state: AllState): + vehicle = state.vehicle # type: VehicleState + route = state.route # type: Route + t = state.t + if DEBUG: + print("[DEBUG] QuinticSplineScurveTrajectoryPlanner.update: t =", t) + + if self.t_last is None: + self.t_last = t + dt = t - self.t_last + + if DEBUG: + print("[DEBUG] QuinticSplineScurveTrajectoryPlanner.update: dt =", dt) + curr_x = vehicle.pose.x + curr_y = vehicle.pose.y + curr_v = vehicle.v + if DEBUG: + print(f"[DEBUG] QuinticSplineScurveTrajectoryPlanner.update: Vehicle position = ({curr_x}, {curr_y}), speed = {curr_v}, ") + + # Determine progress along the route. + if self.route_progress is None: + self.route_progress = 0.0 + _, closest_parameter = route.closest_point_local( + [curr_x, curr_y], + (self.route_progress - 5.0, self.route_progress + 5.0) + ) + if DEBUG: + print("[DEBUG] QuinticSplineScurveTrajectoryPlanner.update: Closest parameter on route =", closest_parameter) + self.route_progress = closest_parameter + # Extract a 10 m segment of the route for planning lookahead. + # route_with_lookahead = route.trim(closest_parameter, closest_parameter + 10.0) + route_with_lookahead = route.trim(self.route_progress, self.route_progress + self.lookahead_dist) + if DEBUG: + print("[DEBUG] QuinticSplineScurveTrajectoryPlanner.update: Route Lookahead =", route_with_lookahead) + raw_points = [list(p) for p in route_with_lookahead.points] + #Generate spline-smoothed path + spline_pts, _ = self._spline.build(raw_points) + spline_path = Path(frame=route_with_lookahead.frame, points=spline_pts.tolist()) + print("[DEBUG] state", state.relations) + # Check whether any yield relations (e.g. due to pedestrians) require braking. + stay_braking = False + pointSet = set() + for i in range(len(route_with_lookahead.points)): + if tuple(route_with_lookahead.points[i]) in pointSet: + stay_braking = True + break + pointSet.add(tuple(route_with_lookahead.points[i])) + should_brake = any( + r.type == EntityRelationEnum.STOPPING_AT and r.obj1 == '' + for r in state.relations + ) + should_decelerate = any( + r.type == EntityRelationEnum.YIELDING and r.obj1 == '' + for r in state.relations + ) if should_brake == False else False + should_accelerate = (not should_brake and not should_decelerate and curr_v < self.desired_speed) + if DEBUG: + print("[DEBUG] QuinticSplineScurveTrajectoryPlanner.update: stay_braking =", stay_braking) + print("[DEBUG] QuinticSplineScurveTrajectoryPlanner.update: should_brake =", should_brake) + print("[DEBUG] QuinticSplineScurveTrajectoryPlanner.update: should_accelerate =", should_accelerate) + print("[DEBUG] QuinticSplineScurveTrajectoryPlanner.update: should_decelerate =", should_decelerate) + if stay_braking: + traj = longitudinal_brake(spline_path, 0.0, 0.0, 0.0, planner_type="scurve") + if DEBUG: + print("[DEBUG] QuinticSplineScurveTrajectoryPlanner.update: Using longitudinal_brake (stay braking).") + elif should_brake: + traj = longitudinal_brake(spline_path, self.emergency_brake, curr_v, planner_type="scurve") + if DEBUG: + print("[DEBUG] QuinticSplineScurveTrajectoryPlanner.update: Using longitudinal_brake.") + elif should_decelerate: + traj = longitudinal_brake(spline_path, self.deceleration, curr_v, planner_type="scurve") + if DEBUG: + print("[DEBUG] QuinticSplineScurveTrajectoryPlanner.update: Using longitudinal_brake.") + elif should_accelerate: + traj = longitudinal_plan(spline_path, self.acceleration, + self.deceleration, self.desired_speed, curr_v, planner_type="scurve") + if DEBUG: + print("[DEBUG] QuinticSplineScurveTrajectoryPlanner.update: Using longitudinal_plan (accelerate).") + else: + # Maintain current speed if not accelerating or braking. + traj = longitudinal_plan(spline_path, 0.0, self.deceleration, self.desired_speed, curr_v, planner_type="scurve") + if DEBUG: + print( + "[DEBUG] QuinticSplineScurveTrajectoryPlanner.update: Maintaining current speed with longitudinal_plan (0 accel).") + self.t_last = t + if DEBUG: + print('[DEBUG] Current Velocity of the Car: LOOK!', curr_v, self.desired_speed) + print("[DEBUG] QuinticSplineScurveTrajectoryPlanner.update: Returning trajectory with", len(traj.points), "points.") + return traj diff --git a/GEMstack/onboard/planning/rrt_star_entrypoint.py b/GEMstack/onboard/planning/rrt_star_entrypoint.py new file mode 100644 index 000000000..77b604954 --- /dev/null +++ b/GEMstack/onboard/planning/rrt_star_entrypoint.py @@ -0,0 +1,150 @@ +import os +import argparse +import time +import matplotlib +import matplotlib.pyplot as plt +import math + +from .collision import build_collision_lookup +from .kinodynamic_rrt_star import OptimizedKinodynamicRRT + + + +def visualize_path(occupancy_grid, path, start_world, goal_world, show_headings=True): + """ + Visualize the planned path. + + Args: + occupancy_grid: Binary occupancy grid (1=obstacle, 0=free) + path: List of (x, y, theta) world coordinates + start_world: (x, y, theta) start position in world coordinates + goal_world: (x, y, theta) goal position in world coordinates + show_headings: Whether to show heading arrows along the path + """ + plt.figure(figsize=(12, 12)) + + # Show occupancy grid + plt.imshow(occupancy_grid, cmap='gray' ) + + # Extract path points + if path: + + xs = [p[0] for p in path] + ys = [p[1] for p in path] + + # Plot path + plt.plot(xs, ys, 'g-', linewidth=2) + + # Show heading arrows + if show_headings: + arrow_interval = max(1, len(path) // 20) # Show ~20 arrows along path + arrow_length = 10 + + for i in range(0, len(path), arrow_interval): + x, y, theta = path[i] + dx = arrow_length * math.cos(theta) + dy = arrow_length * math.sin(theta) + plt.arrow(x, y, dx, dy, head_width=3, head_length=6, fc='blue', ec='blue') + + # Show start and goal + start_grid = start_world + goal_grid = goal_world + + plt.scatter(start_grid[0], start_grid[1], color='green', s=100, marker='o', label='Start') + plt.scatter(goal_grid[0], goal_grid[1], color='red', s=100, marker='x', label='Goal') + + # Draw start and goal heading arrows + dx_start = 15 * math.cos(start_grid[2]) + dy_start = 15 * math.sin(start_grid[2]) + plt.arrow(start_grid[0], start_grid[1], dx_start, dy_start, + head_width=5, head_length=10, fc='green', ec='green') + + dx_goal = 15 * math.cos(goal_grid[2]) + dy_goal = 15 * math.sin(goal_grid[2]) + plt.arrow(goal_grid[0], goal_grid[1], dx_goal, dy_goal, + head_width=5, head_length=10, fc='red', ec='red') + + plt.title('Path Planning Result') + plt.legend() + plt.tight_layout() + #save the plt instead of showing it + plt.savefig(f"path_planning_result_{time.time()}.png") + # plt.show() + + +def optimized_kinodynamic_rrt_planning(start_world, goal_world, occupancy_grid, safety_margin=10, + vehicle_width=20, vehicle_length=45.0, step_size=1.0, max_iterations=100000): + """ + Args: + start_world: (x, y, theta) start position in world coordinates + goal_world: (x, y, theta) goal position in world coordinates + occupancy_grid: Binary occupancy grid (1=obstacle, 0=free) + safety_margin: Safety margin in grid cells + vehicle_width: Vehicle width in meters + step_size: Step size for path sampling + turning_radius: Minimum turning radius + max_iterations: Maximum RRT iterations + + Returns: + List of (x, y, theta) world coordinates for the full path + """ + print(f"Starting optimized kinodynamic RRT planning on {occupancy_grid.shape} grid") + t_start = time.time() + + # === STEP 1: Convert world coordinates to grid coordinates === + start_grid = start_world + goal_grid = goal_world + print(f"Start grid: {start_grid}") + print(f"Goal grid: {goal_grid}") + + # === STEP 2: Build collision lookup === + t_lookup = time.time() + collision_lookup = build_collision_lookup(occupancy_grid, safety_margin, vehicle_width) + import matplotlib.pyplot as plt + + # Visualize collision lookup + plt.figure(figsize=(10, 10)) + print(collision_lookup) + plt.imshow(collision_lookup['collision_mask'], cmap="gray", origin="lower") + plt.title("Collision Lookup Table") + plt.xlabel("Grid X") + plt.ylabel("Grid Y") + # plt.colorbar(label="Collision Status") + # plt.show() + plt.savefig("collision_lookup.png") + print(f"Built collision lookup in {time.time() - t_lookup:.2f}s") + + # === STEP 3: Run Optimized Kinodynamic RRT === + t_rrt_start = time.time() + + # Initialize planner with optimized parameters + planner = OptimizedKinodynamicRRT( + occupancy_grid=occupancy_grid, + collision_lookup_table=collision_lookup, + start_pose_tuple=start_grid, + goal_pose_tuple=goal_grid, + max_iterations=max_iterations, + local_sampling_step_size=step_size, + vehicle_width=vehicle_width, + vehicle_length=vehicle_length + # bidirectional=bidirectional + ) + + ## Testing Summoning RRT implementation + # planner = BiRRT(start_grid, goal_grid, 1-occupancy_grid, [0,4384,0,4667]) + # grid_path = planner.search() + t_rrt = time.time() + grid_path = planner.plan(visualize_planning_output=True) + # anim = planner.animate_sampling() + t_rrt_final = time.time() - t_rrt + print(f"RRT planning completed in {t_rrt_final:.2f}s") + + if grid_path is None: + print("Failed to find path") + return None + + # print(f"Conversion completed in {time.time() - t_convert:.2f}s") + print(f"Final path has {len(grid_path)} points") + print(f"Total planning time: {time.time() - t_start:.2f}s") + + return grid_path diff --git a/GEMstack/state/__init__.py b/GEMstack/state/__init__.py index 77d1bea71..85f4789df 100644 --- a/GEMstack/state/__init__.py +++ b/GEMstack/state/__init__.py @@ -17,7 +17,7 @@ 'VehicleIntent','VehicleIntentEnum', 'AgentIntent', 'EntityRelationEnum','EntityRelation','EntityRelationGraph', - 'MissionEnum','MissionObjective', 'MissionPlan', + 'MissionEnum','MissionObjective', 'MissionPlan', 'ModeEnum', 'PlannerEnum', 'Route', 'PredicateValues', @@ -36,6 +36,6 @@ from .relations import EntityRelation, EntityRelationEnum, EntityRelationGraph from .mission import MissionEnum,MissionObjective from .route import Route, PlannerEnum -from .mission import MissionObjective +from .mission_plan import MissionPlan, ModeEnum from .predicates import PredicateValues from .all import AllState diff --git a/GEMstack/state/all.py b/GEMstack/state/all.py index 922f99dc1..b25ef5343 100644 --- a/GEMstack/state/all.py +++ b/GEMstack/state/all.py @@ -8,6 +8,7 @@ from .relations import EntityRelation from .mission import MissionObjective from .mission import MissionObjective +from .mission_plan import MissionPlan from .route import Route from .trajectory import Trajectory from .predicates import PredicateValues diff --git a/GEMstack/state/mission_plan.py b/GEMstack/state/mission_plan.py new file mode 100644 index 000000000..59299eee6 --- /dev/null +++ b/GEMstack/state/mission_plan.py @@ -0,0 +1,18 @@ +from dataclasses import dataclass +from ..utils.serialization import register +from .route import PlannerEnum +from typing import Optional +from GEMstack.state.physical_object import ObjectPose + +@dataclass +@register +class MissionPlan: + planner_type : PlannerEnum = PlannerEnum.RRT_STAR + goal_vehicle_pose : Optional[ObjectPose] = None + start_vehicle_pose : Optional[ObjectPose] = None + # other mission-specific parameters can be added here + +class ModeEnum: + """Enum for different modes of operation.""" + HARDWARE = "hardware" + SIMULATION = "simulation" diff --git a/GEMstack/state/route.py b/GEMstack/state/route.py index b7b68a600..e9426c00b 100644 --- a/GEMstack/state/route.py +++ b/GEMstack/state/route.py @@ -18,6 +18,16 @@ class PlannerEnum(Enum): PARALLEL_PARKING = 6 # route planning for parallel parking SCANNING = 7 +from enum import Enum + +@dataclass +@register +class PlannerEnum(Enum): + RRT_STAR = 0 #position / yaw in m / radians relative to starting pose of vehicle + HYBRID_A_STAR = 1 #position / yaw in m / radians relative to current pose of vehicle + PARKING = 2 #position in longitude / latitude, yaw=heading in radians with respect to true north (used in GNSS) + + @dataclass @register class Route(Path): diff --git a/launch/inspection.yaml b/launch/inspection.yaml index d29921a6a..f0d835f11 100644 --- a/launch/inspection.yaml +++ b/launch/inspection.yaml @@ -14,7 +14,7 @@ drive: state_estimation : GNSSStateEstimator perception_normalization : StandardPerceptionNormalizer planning: - route_planning_component: + planning_route: type: InspectRoutePlanner args: state_machine: ['IDLE', 'NAV', 'INSPECT', 'FINISH'] @@ -83,4 +83,3 @@ variants: drive: perception: state_estimation: GNSSStateEstimator # Matches your Gazebo GNSS implementation - # visualization: !include "mpl_visualization.yaml" diff --git a/launch/planning.yaml b/launch/planning.yaml new file mode 100644 index 000000000..c049a699b --- /dev/null +++ b/launch/planning.yaml @@ -0,0 +1,148 @@ +description: "Drive the GEM vehicle along a fixed route (currently xyhead_highbay_backlot_p.csv)" +mode: hardware +vehicle_interface: gem_hardware.GEMHardwareInterface +mission_execution: StandardExecutor +# Recovery behavior after a component failure +recovery: + planning: + trajectory_tracking: + type: recovery.StopTrajectoryTracker + print: False +# Driving behavior for the GEM vehicle following a fixed route +drive: + perception: + state_estimation : GNSSStateEstimator + perception_normalization : StandardPerceptionNormalizer + # agent_detection: + # type: agent_detection.GazeboAgentDetector + # args: + # tracked_model_prefixes: ['pedestrian', 'car', 'bicycle'] + + # obstacle_detection: + # type: cone_detection.ConeDetector3D + # # args: + # camera_name: front_right #[front, front_right] + # camera_calib_file: ./GEMstack/knowledge/calibration/cameras.yaml + + # # optional overrides + # enable_tracking: False + # visualize_2d: False + # use_cyl_roi: False + # save_data: False + # orientation: False + # use_start_frame: False + + planning: + relations_estimation: pedestrian_yield_logic.PedestrianYielder + mission_planning_example: + type: MissionPlanningExample + planning_route: + type: PlanningRoutePlanner + trajectory_tracking: + type: pure_pursuit.PurePursuitTrajectoryTracker + args: {desired_speed: 2.5} #approximately 5mph + print: False + + motion_planning: + type: planning_trajectory.QuinticSplineScurveTrajectoryPlanner + args: + accelaration: 5.0 + desired_speed: 2.0 + deceleration: 2.0 + emergency_brake: 8.0 + # motion_planning: yield_spline_planner.SplinePlanner +log: + # Specify the top-level folder to save the log files. Default is 'logs' + #folder : 'logs' + # If prefix is specified, then the log folder will be named with the prefix followed by the date and time. Default no prefix + #prefix : 'fixed_route_' + # If suffix is specified, then logs will output to folder/prefix+suffix. Default uses date and time as the suffix + #suffix : 'test3' + # Specify which ros topics to record to vehicle.bag. Default records nothing. This records the "standard" ROS topics. + ros_topics : [] + # Specify options to pass to rosbag record. Default is no options. + #rosbag_options : '--split --size=1024' + # If True, then record all readings / commands of the vehicle interface. Default False + vehicle_interface : True + # Specify which components to record to behavior.json. Default records nothing + components : ['state_estimation','trajectory_tracking', 'agent_detection'] + # Specify which components of state to record to state.json. Default records nothing + #state: ['all'] + # Specify the rate in Hz at which to record state to state.json. Default records at the pipeline's rate + #state_rate: 10 +replay: # Add items here to set certain topics / inputs to be replayed from logs + # Specify which log folder to replay from + log: + # For replaying sensor data, try !include "../knowledge/defaults/standard_sensor_ros_topics.yaml" + ros_topics : [] + components : [] + +#usually can keep this constant +computation_graph: !include "../GEMstack/knowledge/defaults/computation_graph.yaml" + +after: + show_log_folder: True #set to false to avoid showing the log folder + +#on load, variants will overload the settings structure +variants: + #sim variant doesn't execute on the real vehicle + #real variant executes on the real robot + sim: + run: + mode: simulation + vehicle_interface: + type: gem_simulator.GEMDoubleIntegratorSimulationInterface + args: + scene: !relative_path '../scenes/xyhead_demo.yaml' + + drive: + perception: + state_estimation : OmniscientStateEstimator + agent_detection : OmniscientAgentDetector + # visualization: !include "klampt_visualization.yaml" + visualization: [!include "mpl_visualization.yaml", !include "klampt_visualization.yaml"] + gazebo: + run: + mode: simulation + vehicle_interface: + type: gem_gazebo.GEMGazeboInterface + drive: + perception: + state_estimation: GNSSStateEstimator # Matches your Gazebo GNSS implementation + agent_detection: + type: agent_detection.GazeboAgentDetector + args: + tracked_model_prefixes: ['pedestrian', 'car', 'bicycle'] + obstacle_detection: + type: obstacle_detection.GazeboObstacleDetector + args: + tracked_obstacle_prefixes: ['cone'] + + + planning: + # Adding your custom relation estimation + relations_estimation: pedestrian_yield_logic.PedestrianYielder + # Fixed route with pure pursuit + mission_planning_example: + type: MissionPlanningExample + planning_route: + type: PlanningRoutePlanner + + # motion_planning: longitudinal_planning.YieldTrajectoryPlanner + # motion_planning: yield_spline_planner.SplinePlanner + motion_planning: + type: planning_trajectory.QuinticSplineScurveTrajectoryPlanner + args: + accelaration: 5.0 + desired_speed: 2.0 + deceleration: 2.0 + emergency_brake: 8.0 + trajectory_tracking: + type: pure_pursuit.PurePursuitTrajectoryTracker + args: {desired_speed: 'path'} #approximately 5mph + print: True + + visualization: !include "mpl_visualization.yaml" + log_ros: + log: + ros_topics : !include "../GEMstack/knowledge/defaults/standard_ros_topics.yaml"