diff --git a/tensor_detector/src/holes.py b/tensor_detector/src/holes.py new file mode 100644 index 0000000..bb1a573 --- /dev/null +++ b/tensor_detector/src/holes.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 + + +def get_hole_size(self, hole): #move + x_min, y_min, x_max, y_max = map(int, hole.xyxy[0]) + hole_width = x_max - x_min + hole_height = y_max - y_min + hole_size = hole_height*hole_width + return hole_size + +def find_top_and_bottom_holes(self): #move + if self.plane_normal is None: + self.get_logger().warning("Plane normal not defined. Cannot compute hole positions.") + return + if self.torpedo_centroid is None: + self.get_logger().warning("Centroid not defined. Cannot compute hole positions.") + return + + hole_positions = [] + for hole in self.torpedo_holes: + x_min, y_min, x_max, y_max = map(int, hole.xyxy[0]) + bbox_center_x = (x_min + x_max) / 2 + bbox_center_y = (y_min + y_max) / 2 + + # Calculate 3D position using plane intersection + d = np.linalg.inv(self.intrinsic_matrix) @ np.array([bbox_center_x, bbox_center_y, 1.0]) + d = d / np.linalg.norm(d) + + n = self.plane_normal + p0 = self.torpedo_centroid + + numerator = np.dot(n, p0) + denominator = np.dot(n, d) + if denominator == 0: + self.get_logger().warning(f"Denominator zero for hole center ({bbox_center_x}, {bbox_center_y}). Skipping this hole.") + continue + t = numerator / denominator + if t <= 0: + self.get_logger().warning(f"Intersection behind the camera for hole center ({bbox_center_x}, {bbox_center_y}). Skipping this hole.") + continue + point_3d = t * d + + # Store hole with its 3D position and Y coordinate for sorting + hole_positions.append((hole, point_3d, point_3d[1])) + #self.get_logger().info(f"Hole position computed: Y={point_3d[1]}") + + if not hole_positions: + self.get_logger().warning("No valid hole positions computed.") + return + + # Sort holes based on Y coordinate (height) + hole_positions.sort(key=lambda x: x[2]) + + self.torpedo_bottom_hole = hole_positions[-1][0] # Lower Y = bottom + self.torpedo_top_hole = hole_positions[0][0] # Higher Y = top + + #self.get_logger().info(f"Bottom hole Y: {hole_positions[0][2]}") + #self.get_logger().info(f"Top hole Y: {hole_positions[-1][2]}") + +def find_smallest_and_largest_holes(self): #move + if self.plane_normal is None or self.mapping_map_centroid is None: + self.get_logger().warning("Plane normal or centroid not defined. Cannot compute hole sizes.") + return + + hole_sizes = [] + for hole in self.mapping_holes: + x_min, y_min, x_max, y_max = map(int, hole.xyxy[0]) + corners_2d = [ + (x_min, y_min), + (x_max, y_min), + (x_max, y_max), + (x_min, y_max) + ] + corners_3d = [] + for (u, v) in corners_2d: + d = np.linalg.inv(self.intrinsic_matrix) @ np.array([u, v, 1.0]) + n = self.plane_normal + p0 = self.mapping_map_centroid + + numerator = np.dot(n, p0) + denominator = np.dot(n, d) + if denominator == 0: + self.get_logger().warning(f"Denominator zero for point ({u}, {v}). Skipping this corner.") + continue + t = numerator / denominator + if t <= 0: + self.get_logger().warning(f"Intersection behind the camera for point ({u}, {v}). Skipping this corner.") + continue + point_3d = t * d + corners_3d.append(point_3d) + + if len(corners_3d) == 4: + width_vector = corners_3d[1] - corners_3d[0] + height_vector = corners_3d[3] - corners_3d[0] + width = np.linalg.norm(width_vector) + height = np.linalg.norm(height_vector) + hole_size = width * height + hole_sizes.append((hole, hole_size)) + #self.get_logger().info(f"Hole size computed: {hole_size}") + else: + self.get_logger().warning(f"Not enough valid corners for hole. Expected 4, got {len(corners_3d)}") + continue + + if not hole_sizes: + self.get_logger().warning("No valid hole sizes computed.") + return + + # Sort holes based on hole_size + hole_sizes.sort(key=lambda x: x[1]) + + self.smallest_hole = hole_sizes[0][0] + self.largest_hole = hole_sizes[-1][0] + + #self.get_logger().info(f"Smallest hole size: {hole_sizes[0][1]}") + #self.get_logger().info(f"Largest hole size: {hole_sizes[-1][1]}") + +def cleanup_old_holes(self, age_threshold=2.0): #move + # Get the current time as a builtin_interfaces.msg.Time object + current_time = self.get_clock().now().to_msg() + + # for bbox, timestamp in self.holes: + # self.get_logger().info(f"Time for cleanup: {((current_time.sec - timestamp.sec) + (current_time.nanosec - timestamp.nanosec) * 1e-9)}") + # Filter out holes older than the specified age threshold + self.holes = [(bbox, timestamp) for bbox, timestamp in self.holes + if ((current_time.sec - timestamp.sec) + (current_time.nanosec - timestamp.nanosec) * 1e-9) < age_threshold] \ No newline at end of file diff --git a/tensor_detector/src/yolo_orientation.py b/tensor_detector/src/yolo_orientation.py index 7a06511..7a2100c 100644 --- a/tensor_detector/src/yolo_orientation.py +++ b/tensor_detector/src/yolo_orientation.py @@ -17,6 +17,7 @@ import yaml import math from std_srvs.srv import SetBool +from holes import get_hole_size, find_smallest_and_largest_holes, find_top_and_bottom_holes, cleanup_old_holes class YOLONode(Node): def __init__(self): @@ -520,20 +521,20 @@ def process_slalom_red_detections(self, detections_array): self.slalom_red_detections = [] def image_callback(self, msg: Image): - + if self.log_processing_time: self.detection_time = time.time() - + if self.depth_image is None or not self.camera_info_gathered: self.get_logger().warning("Skipping image because either no depth image or camera info is available.", throttle_duration_sec=1) return - + cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8") self.gray_image = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY) if cv_image is None: return results = self.model(cv_image, verbose=False, iou=self.iou, conf=self.conf) - + detections = Detection3DArray() detections.header.frame_id = self.frame_id self.detection_timestamp = msg.header.stamp @@ -541,10 +542,10 @@ def image_callback(self, msg: Image): detections.header.stamp = msg.header.stamp else: detections.header.stamp = self.get_clock().now().to_msg() - + if self.mask is None or self.mask.shape[:2] != cv_image.shape[:2]: self.mask = np.zeros(cv_image.shape[:2], dtype=np.uint8) - + # Reset mapping holes each image self.mapping_holes = [] self.torpedo_holes = [] @@ -553,83 +554,19 @@ def image_callback(self, msg: Image): self.torpedo_bottom_hole = None self.largest_hole = None self.smallest_hole = None - + for result in results: for box in result.boxes.cpu().numpy(): - if box.conf[0] <= self.conf: - continue - class_id = box.cls[0] - - if class_id in self.class_id_map: - conf = box.conf[0] - #self.get_logger().info(f"class id: {class_id}") - # If its a hole, store it, otherwise make the detection message - if self.class_id_map[class_id] == "mapping_hole": - - #self.get_logger().info(f"class id: {class_id}") - x_min, y_min, x_max, y_max = map(int, box.xyxy[0]) - if self.use_incoming_timestamp: - self.holes.append(((x_min, y_min, x_max, y_max), self.detection_timestamp)) - else: - self.holes.append(((x_min, y_min, x_max, y_max), self.get_clock().now().to_msg())) - self.mapping_holes.append(box) - #self.get_logger().info(f"Holes after adding: {len(self.holes)}") - #self.get_logger().info(f"holes: {len(self.mapping_holes)}") - if self.class_id_map[class_id] == "torpedo_hole": - - #self.get_logger().info(f"class id: {class_id}") - x_min, y_min, x_max, y_max = map(int, box.xyxy[0]) - if self.use_incoming_timestamp: - self.holes.append(((x_min, y_min, x_max, y_max), self.detection_timestamp)) - else: - self.holes.append(((x_min, y_min, x_max, y_max), self.get_clock().now().to_msg())) - self.torpedo_holes.append(box) - #self.get_logger().info(f"Holes after adding: {len(self.holes)}") - #self.get_logger().info(f"holes: {len(self.mapping_holes)}") - elif class_id in self.class_id_map and self.class_id_map[class_id] == "slalom_red": - # Don't create detection immediately, store for later processing - detection_temp = self.create_detection3d_message(box, cv_image, conf) - if detection_temp and detection_temp.results: - # Extract the centroid and quaternion from the detection - result_temp= detection_temp.results[0] - centroid = [ - result_temp.pose.pose.position.x, - result_temp.pose.pose.position.y, - result_temp.pose.pose.position.z - ] - quat = [ - result_temp.pose.pose.orientation.x, - result_temp.pose.pose.orientation.y, - result_temp.pose.pose.orientation.z, - result_temp.pose.pose.orientation.w - ] - - # Store detection data for sorting - x_min, y_min, x_max, y_max = map(int, box.xyxy[0]) - bbox_width = x_max - x_min - bbox_height = y_max - y_min - - self.slalom_red_detections.append({ - 'centroid': centroid, - 'quat': quat, - 'conf': box.conf[0], - 'bbox_width': bbox_width, - 'bbox_height': bbox_height - }) - else: - detection = self.create_detection3d_message(box, cv_image, conf) - - if detection: - detections.detections.append(detection) - - self.mask.fill(0) - for contour in result.masks.xy: - contour = np.array(contour, dtype=np.int32) - cv2.fillPoly(self.mask, [contour], 255) - mask_msg = self.bridge.cv2_to_imgmsg(self.mask,encoding="mono8") - #self.mask_publisher.publish(mask_msg) - - + self.handle_box(box, cv_image, detections) + + self.mask.fill(0) + for contour in result.masks.xy: + contour = np.array(contour, dtype=np.int32) + cv2.fillPoly(self.mask, [contour], 255) + mask_msg = self.bridge.cv2_to_imgmsg(self.mask,encoding="mono8") + #self.mask_publisher.publish(mask_msg) + + # Create detection3d for the holes if there are 4 if len(self.mapping_holes) == 4: #self.get_logger().info(f"holes: {len(self.mapping_holes)}") @@ -678,7 +615,7 @@ def image_callback(self, msg: Image): self.publish_markers(self.temp_markers) self.temp_markers = [] # Clear the list for the next frame - + annotated_frame = results[0].plot() self.publish_accumulated_point_cloud() @@ -701,148 +638,80 @@ def image_callback(self, msg: Image): if self.has_subscribers(self.detection_publisher): self.detection_publisher.publish(detections) self.detection_id_counter = 0 - - def get_hole_size(self, hole): - x_min, y_min, x_max, y_max = map(int, hole.xyxy[0]) - hole_width = x_max - x_min - hole_height = y_max - y_min - hole_size = hole_height*hole_width - return hole_size - - # def find_smallest_and_largest_holes(self): - # hole_sizes = [] - # for hole in self.mapping_holes: - # hole_size = self.get_hole_size(hole) - - # if self.largest_hole is None: - # self.largest_hole = hole - # else: - # largest_hole_size = self.get_hole_size(self.largest_hole) - # if hole_size > largest_hole_size: - # self.largest_hole = hole - - # if self.smallest_hole is None: - # self.smallest_hole = hole - # else: - # smallest_hole_size = self.get_hole_size(self.smallest_hole) - # if hole_size < smallest_hole_size: - # self.smallest_hole = hole - - def find_top_and_bottom_holes(self): - if self.plane_normal is None: - self.get_logger().warning("Plane normal not defined. Cannot compute hole positions.") - return - if self.torpedo_centroid is None: - self.get_logger().warning("Centroid not defined. Cannot compute hole positions.") - return + def handle_box(self, box, cv_image, detections): + conf = box.conf[0] + class_id = box.cls[0] - hole_positions = [] - for hole in self.torpedo_holes: - x_min, y_min, x_max, y_max = map(int, hole.xyxy[0]) - bbox_center_x = (x_min + x_max) / 2 - bbox_center_y = (y_min + y_max) / 2 - - # Calculate 3D position using plane intersection - d = np.linalg.inv(self.intrinsic_matrix) @ np.array([bbox_center_x, bbox_center_y, 1.0]) - d = d / np.linalg.norm(d) - - n = self.plane_normal - p0 = self.torpedo_centroid - - numerator = np.dot(n, p0) - denominator = np.dot(n, d) - if denominator == 0: - self.get_logger().warning(f"Denominator zero for hole center ({bbox_center_x}, {bbox_center_y}). Skipping this hole.") - continue - t = numerator / denominator - if t <= 0: - self.get_logger().warning(f"Intersection behind the camera for hole center ({bbox_center_x}, {bbox_center_y}). Skipping this hole.") - continue - point_3d = t * d - - # Store hole with its 3D position and Y coordinate for sorting - hole_positions.append((hole, point_3d, point_3d[1])) - #self.get_logger().info(f"Hole position computed: Y={point_3d[1]}") - - if not hole_positions: - self.get_logger().warning("No valid hole positions computed.") + if conf <= self.conf: return - - # Sort holes based on Y coordinate (height) - hole_positions.sort(key=lambda x: x[2]) - - self.torpedo_bottom_hole = hole_positions[-1][0] # Lower Y = bottom - self.torpedo_top_hole = hole_positions[0][0] # Higher Y = top - - #self.get_logger().info(f"Bottom hole Y: {hole_positions[0][2]}") - #self.get_logger().info(f"Top hole Y: {hole_positions[-1][2]}") - - def find_smallest_and_largest_holes(self): - if self.plane_normal is None or self.mapping_map_centroid is None: - self.get_logger().warning("Plane normal or centroid not defined. Cannot compute hole sizes.") + if class_id not in self.class_id_map: return + class_name = self.class_id_map[class_id] - hole_sizes = [] - for hole in self.mapping_holes: - x_min, y_min, x_max, y_max = map(int, hole.xyxy[0]) - corners_2d = [ - (x_min, y_min), - (x_max, y_min), - (x_max, y_max), - (x_min, y_max) - ] - corners_3d = [] - for (u, v) in corners_2d: - d = np.linalg.inv(self.intrinsic_matrix) @ np.array([u, v, 1.0]) - n = self.plane_normal - p0 = self.mapping_map_centroid + if class_name == "mapping_hole": + self.handle_mapping_hole(box) + elif class_name == "torpedo_hole": + self.handle_torpedo_hole(box) + elif class_name == "slalom_red": + self.handle_slalom_red(box, cv_image) + else: + detection = self.create_detection3d_message(box, cv_image, conf) + if detection: + detections.detections.append(detection) + + def handle_mapping_hole(self, box): + x_min, y_min, x_max, y_max = map(int, box.xyxy[0]) + timestamp = ( + self.detection_timestamp + if self.use_incoming_timestamp + else self.get_clock().now().to_msg() + ) + self.holes.append(((x_min, y_min, x_max, y_max), timestamp)) + self.mapping_holes.append(box) - numerator = np.dot(n, p0) - denominator = np.dot(n, d) - if denominator == 0: - self.get_logger().warning(f"Denominator zero for point ({u}, {v}). Skipping this corner.") - continue - t = numerator / denominator - if t <= 0: - self.get_logger().warning(f"Intersection behind the camera for point ({u}, {v}). Skipping this corner.") - continue - point_3d = t * d - corners_3d.append(point_3d) - - if len(corners_3d) == 4: - width_vector = corners_3d[1] - corners_3d[0] - height_vector = corners_3d[3] - corners_3d[0] - width = np.linalg.norm(width_vector) - height = np.linalg.norm(height_vector) - hole_size = width * height - hole_sizes.append((hole, hole_size)) - #self.get_logger().info(f"Hole size computed: {hole_size}") - else: - self.get_logger().warning(f"Not enough valid corners for hole. Expected 4, got {len(corners_3d)}") - continue - if not hole_sizes: - self.get_logger().warning("No valid hole sizes computed.") + def handle_torpedo_hole(self, box): + x_min, y_min, x_max, y_max = map(int, box.xyxy[0]) + timestamp = ( + self.detection_timestamp + if self.use_incoming_timestamp + else self.get_clock().now().to_msg() + + ) + self.holes.append(((x_min, y_min, x_max, y_max), timestamp)) + self.torpedo_holes.append(box) + + def handle_slalom_red(self, box, cv_image): + detection_temp = self.create_detection3d_message(box, cv_image, box.conf[0]) + if not detection_temp or not detection_temp.results: return - - # Sort holes based on hole_size - hole_sizes.sort(key=lambda x: x[1]) - - self.smallest_hole = hole_sizes[0][0] - self.largest_hole = hole_sizes[-1][0] - - #self.get_logger().info(f"Smallest hole size: {hole_sizes[0][1]}") - #self.get_logger().info(f"Largest hole size: {hole_sizes[-1][1]}") - - def cleanup_old_holes(self, age_threshold=2.0): - # Get the current time as a builtin_interfaces.msg.Time object - current_time = self.get_clock().now().to_msg() - - # for bbox, timestamp in self.holes: - # self.get_logger().info(f"Time for cleanup: {((current_time.sec - timestamp.sec) + (current_time.nanosec - timestamp.nanosec) * 1e-9)}") - # Filter out holes older than the specified age threshold - self.holes = [(bbox, timestamp) for bbox, timestamp in self.holes - if ((current_time.sec - timestamp.sec) + (current_time.nanosec - timestamp.nanosec) * 1e-9) < age_threshold] + + result_temp = detection_temp.results[0] + + centroid = [ + result_temp.pose.pose.position.x, + result_temp.pose.pose.position.y, + result_temp.pose.pose.position.z + ] + + quat = [ + result_temp.pose.pose.orientation.x, + result_temp.pose.pose.orientation.y, + result_temp.pose.pose.orientation.z, + result_temp.pose.pose.orientation.w + ] + + x_min, y_min, x_max, y_max, = map(int, box.xyxy[0]) + + self.slalom_red_detections.append({ + 'centroid':centroid, + 'quat':quat, + 'conf':box.conf[0], + 'bbox_width': x_max - x_min, + 'bbox_height': y_max - y_min + }) + + def generate_unique_detection_id(self): # Increment and return the counter to get a unique ID for each detection @@ -989,39 +858,7 @@ def create_detection3d_message(self, box, cv_image, conf, hole_scale=None): detection.results.append(self.create_object_hypothesis_with_pose(class_name, hole_centroid, hole_quat, conf)) return detection - # elif class_name == "torpedo_open": - # self.latest_bbox_class_7 = (x_min, y_min, x_max, y_max) - # elif class_name == "torpedo_closed": - # self.latest_bbox_class_8 = (x_min, y_min, x_max, y_max) - # elif class_name == "torpedo_hole": - # if self.open_torpedo_centroid is not None and self.open_torpedo_quat is not None and self.latest_bbox_class_7 and self.is_inside_bbox(bbox, self.latest_bbox_class_7): - # class_name = "torpedo_open_hole" - # hole_quat = self.open_torpedo_quat - # hole_centroid = self.calculate_centroid(bbox_center_x, bbox_center_y, self.open_torpedo_centroid[2]) - # elif self.closed_torpedo_centroid is not None and self.closed_torpedo_quat is not None and self.latest_bbox_class_8 and self.is_inside_bbox(bbox, self.latest_bbox_class_8): - # class_name = "torpedo_closed_hole" - # hole_quat = self.closed_torpedo_quat - # hole_centroid = self.calculate_centroid(bbox_center_x, bbox_center_y, self.closed_torpedo_centroid[2]) - # else: - # return None - - # if self.use_incoming_timestamp: - # self.holes.append(((x_min, y_min, x_max, y_max), self.detection_timestamp)) - # else: - # self.holes.append(((x_min, y_min, x_max, y_max), self.get_clock().now().to_msg())) - - - # self.publish_marker(hole_quat, hole_centroid, class_name, bbox_width, bbox_height) - - # # Create Detection3D message - # detection = Detection3D() - # detection.header.frame_id = self.frame_id - # if self.use_incoming_timestamp: - # detection.header.stamp = self.detection_timestamp - # else: - # detection.header.stamp = self.get_clock().now().to_msg() - # detection.results.append(self.create_object_hypothesis_with_pose(class_name, hole_centroid, hole_quat, conf)) - # return detection + if class_name == "slalom_red": shrink_x = (x_max - x_min) * 0.3 @@ -1097,51 +934,6 @@ def create_detection3d_message(self, box, cv_image, conf, hole_scale=None): # Continue with feature detection using the adjusted mask_roi masked_gray_image = cv2.bitwise_and(cropped_gray_image, cropped_gray_image, mask=mask_roi) - - # elif class_name == "slalom_close": - # depth_value = self.depth_image[int(bbox_center_y), int(bbox_center_x)] - # centroid = self.calculate_centroid(bbox_center_x, bbox_center_y, float(depth_value)) - # quat, _ = self.calculate_quaternion_and_euler_angles(-self.default_normal) - - # self.publish_marker(quat, centroid, class_name, bbox_width, bbox_height) - - # # Create Detection3D message - # detection = Detection3D() - # detection.header.frame_id = self.frame_id - # if self.use_incoming_timestamp: - # detection.header.stamp = self.detection_timestamp - # else: - # detection.header.stamp = self.get_clock().now().to_msg() - - # # Set the pose - # class_name = "slalom_close" - # detection.results.append(self.create_object_hypothesis_with_pose(class_name, centroid, quat, conf)) - - # return detection - - - - # elif class_name == "slalom_red": - # depth_value = self.depth_image[int(bbox_center_y), int(bbox_center_x)] - # centroid = self.calculate_centroid(bbox_center_x, bbox_center_y, float(depth_value)) - # quat, _ = self.calculate_quaternion_and_euler_angles(-self.default_normal) - - # self.publish_marker(quat, centroid, class_name, bbox_width, bbox_height) - - - # # Create Detection3D message - # detection = Detection3D() - # detection.header.frame_id = self.frame_id - # if self.use_incoming_timestamp: - # detection.header.stamp = self.detection_timestamp - # else: - # detection.header.stamp = self.get_clock().now().to_msg() - - # # Set the pose - # class_name = "slalom_close" - # detection.results.append(self.create_object_hypothesis_with_pose(class_name, centroid, quat, conf)) - - # return detection elif class_name == "slalom_red": # Sample the depth value at the center of the bounding box @@ -1266,26 +1058,7 @@ def create_detection3d_message(self, box, cv_image, conf, hole_scale=None): detection.results.append(self.create_object_hypothesis_with_pose(class_name, centroid, quat, conf)) return detection - - # else: - # self.get_logger().info(f"wtf: {class_name}") - # if self.latest_buoy is not None and class_name == "buoy": - # centroid = self.calculate_centroid(bbox_center_x, bbox_center_y, self.latest_buoy[2]) - # quat, _ = self.calculate_quaternion_and_euler_angles(-self.default_normal) - # self.publish_marker(quat, centroid, class_name, bbox_width, bbox_height) - # detection = Detection3D() - # detection.header.frame_id = self.frame_id - # if self.use_incoming_timestamp: - # detection.header.stamp = self.detection_timestamp - # else: - # detection.header.stamp = self.get_clock().now().to_msg() - - # # Set the pose - # detection.results.append(self.create_object_hypothesis_with_pose(class_name, centroid, quat, conf)) - - # return detection - - # return None + def calculate_centroid(self, center_x, center_y, z): center_3d_x = (center_x - self.cx) * z / self.fx