diff --git a/modules/auto_landing/auto_landing.py b/modules/auto_landing/auto_landing.py index a8841062..5bfcf3c8 100644 --- a/modules/auto_landing/auto_landing.py +++ b/modules/auto_landing/auto_landing.py @@ -4,11 +4,25 @@ """ import math +from enum import Enum +from typing import Optional +import threading from .. import merged_odometry_detections from ..common.modules.logger import logger +class DetectionSelectionStrategy(Enum): + """ + Strategies for selecting which detection to use when multiple targets are detected. + """ + + NEAREST_TO_CENTER = "nearest_to_center" # Choose detection closest to image center + LARGEST_AREA = "largest_area" # Choose detection with largest bounding box area + HIGHEST_CONFIDENCE = "highest_confidence" # Choose detection with highest confidence + FIRST_DETECTION = "first_detection" # Use first detection in list (original behavior) + + class AutoLandingInformation: """ Information necessary for the LANDING_TARGET MAVLink command. @@ -44,17 +58,21 @@ def create( im_h: float, im_w: float, local_logger: logger.Logger, + selection_strategy: DetectionSelectionStrategy = DetectionSelectionStrategy.NEAREST_TO_CENTER, ) -> "tuple [bool, AutoLanding | None ]": """ fov_x: The horizontal camera field of view in degrees. fov_y: The vertical camera field of view in degrees. im_w: Width of image. im_h: Height of image. + selection_strategy: Strategy for selecting which detection to use when multiple targets are detected. Returns an AutoLanding object. """ - return True, AutoLanding(cls.__create_key, fov_x, fov_y, im_h, im_w, local_logger) + return True, AutoLanding( + cls.__create_key, fov_x, fov_y, im_h, im_w, local_logger, selection_strategy + ) def __init__( self, @@ -64,6 +82,7 @@ def __init__( im_h: float, im_w: float, local_logger: logger.Logger, + selection_strategy: DetectionSelectionStrategy, ) -> None: """ Private constructor, use create() method. @@ -75,10 +94,75 @@ def __init__( self.im_h = im_h self.im_w = im_w self.__logger = local_logger + self.__selection_strategy = selection_strategy + + def _select_detection(self, detections: list) -> Optional[int]: + """ + Select which detection to use based on the configured strategy. + + Returns the index of the selected detection, or None if no suitable detection found. + """ + if not detections: + return None + + if len(detections) == 1: + return 0 + + if self.__selection_strategy == DetectionSelectionStrategy.FIRST_DETECTION: + return 0 + + if self.__selection_strategy == DetectionSelectionStrategy.NEAREST_TO_CENTER: + # Find detection closest to image center + image_center_x = self.im_w / 2 + image_center_y = self.im_h / 2 + + min_distance = float("inf") + selected_index = 0 + + for i, detection in enumerate(detections): + det_center_x, det_center_y = detection.get_centre() + distance = math.sqrt( + (det_center_x - image_center_x) ** 2 + (det_center_y - image_center_y) ** 2 + ) + if distance < min_distance: + min_distance = distance + selected_index = i + + return selected_index + + if self.__selection_strategy == DetectionSelectionStrategy.LARGEST_AREA: + # Find detection with largest bounding box area + max_area = 0 + selected_index = 0 + + for i, detection in enumerate(detections): + width = detection.x_2 - detection.x_1 + height = detection.y_2 - detection.y_1 + area = width * height + if area > max_area: + max_area = area + selected_index = i + + return selected_index + + if self.__selection_strategy == DetectionSelectionStrategy.HIGHEST_CONFIDENCE: + # Find detection with highest confidence + max_confidence = 0 + selected_index = 0 + + for i, detection in enumerate(detections): + if detection.confidence > max_confidence: + max_confidence = detection.confidence + selected_index = i + + return selected_index + + # Default fallback + return 0 def run( self, odometry_detections: merged_odometry_detections.MergedOdometryDetections - ) -> "tuple[bool, AutoLandingInformation]": + ) -> "tuple[bool, AutoLandingInformation | None]": """ Calculates the x and y angles in radians of the bounding box based on its center. @@ -87,8 +171,26 @@ def run( Returns an AutoLandingInformation object. """ - # TODO: Devise better algorithm to pick which detection to land at if several are detected - x_center, y_center = odometry_detections.detections[0].get_centre() + # Check if we have any detections + if not odometry_detections.detections: + self.__logger.warning("No detections available for auto-landing", True) + return False, None + + # Select which detection to use + selected_index = self._select_detection(odometry_detections.detections) + if selected_index is None: + self.__logger.error("Failed to select detection for auto-landing", True) + return False, None + + selected_detection = odometry_detections.detections[selected_index] + + # Log selection information + self.__logger.info( + f"Selected detection {selected_index + 1}/{len(odometry_detections.detections)} using strategy: {self.__selection_strategy.value}", + True, + ) + + x_center, y_center = selected_detection.get_centre() angle_x = (x_center - self.im_w / 2) * (self.fov_x * (math.pi / 180)) / self.im_w angle_y = (y_center - self.im_h / 2) * (self.fov_y * (math.pi / 180)) / self.im_h @@ -108,3 +210,94 @@ def run( ) return True, AutoLandingInformation(angle_x, angle_y, target_to_vehicle_dist) + + +class AutoLandingController: + """ + Controller for turning auto-landing on/off. + """ + + __create_key = object() + + @classmethod + def create( + cls, + auto_landing: AutoLanding, + local_logger: logger.Logger, + ) -> "tuple[bool, AutoLandingController | None]": + """ + Create an AutoLandingController instance. + + auto_landing: The AutoLanding instance to control. + local_logger: Logger instance for logging. + + Returns an AutoLandingController object. + """ + return True, AutoLandingController(cls.__create_key, auto_landing, local_logger) + + def __init__( + self, + class_private_create_key: object, + auto_landing: AutoLanding, + local_logger: logger.Logger, + ) -> None: + """ + Private constructor, use create() method. + """ + assert class_private_create_key is AutoLandingController.__create_key, "Use create() method" + + self.__auto_landing = auto_landing + self.__logger = local_logger + self.__enabled = False + self.__enabled_lock = threading.Lock() + + def is_enabled(self) -> bool: + """ + Check if auto-landing is enabled. + """ + with self.__enabled_lock: + return self.__enabled + + def enable(self) -> bool: + """ + Enable auto-landing system. + + Returns True if successfully enabled, False otherwise. + """ + with self.__enabled_lock: + if not self.__enabled: + self.__enabled = True + self.__logger.info("Auto-landing system enabled", True) + return True + self.__logger.warning("Auto-landing system already enabled", True) + return False + + def disable(self) -> bool: + """ + Disable auto-landing system. + + Returns True if successfully disabled, False otherwise. + """ + with self.__enabled_lock: + if self.__enabled: + self.__enabled = False + self.__logger.info("Auto-landing system disabled", True) + return True + self.__logger.warning("Auto-landing system already disabled", True) + return False + + def process_detections( + self, odometry_detections: merged_odometry_detections.MergedOdometryDetections + ) -> "tuple[bool, AutoLandingInformation | None]": + """ + Process detections if auto-landing is enabled. + + Returns landing information if processing was successful, None otherwise. + """ + with self.__enabled_lock: + if not self.__enabled: + return False, None + + # Process the detections using the auto-landing module + result, landing_info = self.__auto_landing.run(odometry_detections) + return result, landing_info diff --git a/modules/auto_landing/auto_landing_worker.py b/modules/auto_landing/auto_landing_worker.py index 62890e04..59a45f74 100644 --- a/modules/auto_landing/auto_landing_worker.py +++ b/modules/auto_landing/auto_landing_worker.py @@ -4,6 +4,7 @@ import os import pathlib +import queue import time from utilities.workers import queue_proxy_wrapper @@ -12,22 +13,40 @@ from ..common.modules.logger import logger +class AutoLandingCommand: + """ + Command structure for controlling auto-landing operations. + """ + + def __init__(self, command: str) -> None: + self.command = command + + def auto_landing_worker( fov_x: float, fov_y: float, im_h: float, im_w: float, period: float, + detection_strategy: auto_landing.DetectionSelectionStrategy, input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, + command_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, ) -> None: """ - Worker process. + Worker process for auto-landing operations. - period: Wait time in seconds. - input_queue and output_queue are data queues. - controller is how the main process communicates to this worker process. + fov_x: Horizontal field of view in degrees. + fov_y: Vertical field of view in degrees. + im_h: Image height in pixels. + im_w: Image width in pixels. + period: Wait time in seconds between processing cycles. + detection_strategy: Strategy for selecting detection when multiple targets are present. + input_queue: Queue for receiving merged odometry detections. + output_queue: Queue for sending auto-landing information. + command_queue: Queue for receiving enable/disable commands. + controller: Worker controller for pause/exit management. """ worker_name = pathlib.Path(__file__).stem @@ -42,25 +61,106 @@ def auto_landing_worker( local_logger.info("Logger initialized", True) - result, auto_lander = auto_landing.AutoLanding.create(fov_x, fov_y, im_h, im_w, local_logger) - + # Create auto-landing instance + result, auto_lander = auto_landing.AutoLanding.create( + fov_x, fov_y, im_h, im_w, local_logger, detection_strategy + ) if not result: - local_logger.error("Worker failed to create class object", True) + local_logger.error("Worker failed to create AutoLanding object", True) return # Get Pylance to stop complaining assert auto_lander is not None + # Create auto-landing controller + result, landing_controller = auto_landing.AutoLandingController.create( + auto_lander, local_logger + ) + if not result: + local_logger.error("Worker failed to create AutoLandingController object", True) + return + + # Get Pylance to stop complaining + assert landing_controller is not None + + local_logger.info("Auto-landing worker initialized successfully", True) + while not controller.is_exit_requested(): controller.check_pause() - input_data = input_queue.queue.get() - if input_data is None: - continue + # Process commands first + _process_commands(command_queue, landing_controller, local_logger) - result, value = auto_lander.run(input_data) - if not result: - continue + # Process detections if available + input_data = None + try: + input_data = input_queue.queue.get_nowait() + except queue.Empty: + # No data available, continue + pass + + if input_data is not None: + result, landing_info = landing_controller.process_detections(input_data) + if result and landing_info: + output_queue.queue.put(landing_info) - output_queue.queue.put(value) time.sleep(period) + + +def _process_commands( + command_queue: queue_proxy_wrapper.QueueProxyWrapper, + landing_controller: auto_landing.AutoLandingController, + local_logger: logger.Logger, +) -> None: + """ + Process all available commands in the command queue. + + command_queue: Queue containing AutoLandingCommand objects. + landing_controller: Controller instance to execute commands on. + local_logger: Logger for command processing. + """ + while True: + try: + command = command_queue.queue.get_nowait() + if command is None: + break + + if isinstance(command, AutoLandingCommand): + _execute_command(command, landing_controller, local_logger) + else: + local_logger.warning(f"Received invalid command type: {type(command)}", True) + + except queue.Empty: + # No more commands available + break + + +def _execute_command( + command: AutoLandingCommand, + landing_controller: auto_landing.AutoLandingController, + local_logger: logger.Logger, +) -> None: + """ + Execute an auto-landing command. + + command: Command to execute. + landing_controller: Controller instance to execute command on. + local_logger: Logger for command execution. + """ + local_logger.info(f"Executing command: {command.command}", True) + + success = False + if command.command == "enable": + success = landing_controller.enable() + elif command.command == "disable": + success = landing_controller.disable() + else: + local_logger.error( + f"Unknown command: {command.command}. Only 'enable' and 'disable' are supported.", True + ) + return + + if success: + local_logger.info(f"Command '{command.command}' executed successfully", True) + else: + local_logger.error(f"Command '{command.command}' failed to execute", True) diff --git a/modules/common b/modules/common index a48579ef..c0ffaa5c 160000 --- a/modules/common +++ b/modules/common @@ -1 +1 @@ -Subproject commit a48579ef93b187e0a6b72ca941108866e270f089 +Subproject commit c0ffaa5cb4bc55a1eeb0f02d4418eaab10c69b81 diff --git a/tests/integration/test_auto_landing_worker.py b/tests/integration/test_auto_landing_worker.py new file mode 100644 index 00000000..07e9edaf --- /dev/null +++ b/tests/integration/test_auto_landing_worker.py @@ -0,0 +1,248 @@ +""" +Test auto-landing worker process. +""" + +import datetime +import multiprocessing as mp +import pathlib +import time + +import numpy as np + +from modules import detections_and_time +from modules import merged_odometry_detections +from modules.auto_landing import auto_landing +from modules.auto_landing import auto_landing_worker +from modules.common.modules import orientation +from modules.common.modules import position_local +from modules.common.modules.mavlink import drone_odometry_local +from modules.common.modules.logger import logger +from utilities.workers import queue_proxy_wrapper +from utilities.workers import worker_controller + +# Worker parameters +FOV_X = 90.0 # degrees +FOV_Y = 90.0 # degrees +IMAGE_HEIGHT = 640.0 # pixels +IMAGE_WIDTH = 640.0 # pixels +WORKER_PERIOD = 0.1 # seconds +DETECTION_STRATEGY = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER +LOG_TIMINGS = False # Disable timing logging for the test + +# Ensure logs directory exists and create timestamped subdirectory +LOG_DIR = pathlib.Path("logs") +LOG_DIR.mkdir(parents=True, exist_ok=True) + +# Create a timestamped subdirectory for this test session +TIMESTAMP_FORMAT = "%Y-%m-%d_%H-%M-%S" +test_session_dir = LOG_DIR / datetime.datetime.now().strftime(TIMESTAMP_FORMAT) +test_session_dir.mkdir(parents=True, exist_ok=True) + + +def simulate_detection_input( + input_queue: queue_proxy_wrapper.QueueProxyWrapper, + detections: list, + position: tuple, + orientation_angles: tuple, +) -> None: + """ + Create and place merged odometry detections into the input queue. + """ + # Create drone position + result, drone_position = position_local.PositionLocal.create( + position[0], position[1], position[2] + ) + assert result + assert drone_position is not None + + # Create drone orientation + result, drone_orientation = orientation.Orientation.create( + orientation_angles[0], orientation_angles[1], orientation_angles[2] + ) + assert result + assert drone_orientation is not None + + # Create drone odometry + result, drone_odometry = drone_odometry_local.DroneOdometryLocal.create( + drone_position, drone_orientation + ) + assert result + assert drone_odometry is not None + + # Create merged odometry detections + result, merged = merged_odometry_detections.MergedOdometryDetections.create( + drone_odometry, detections + ) + assert result + assert merged is not None + + input_queue.queue.put(merged) + + +def create_test_detection( + bbox: list, label: int, confidence: float +) -> detections_and_time.Detection: + """ + Create a test detection with the given parameters. + """ + result, detection = detections_and_time.Detection.create( + np.array(bbox, dtype=np.float32), label, confidence + ) + assert result + assert detection is not None + return detection + + +def main() -> int: + """ + Main function. + """ + # Logger + test_name = pathlib.Path(__file__).stem + result, local_logger = logger.Logger.create(test_name, LOG_TIMINGS) + assert result # Logger initialization should succeed + assert local_logger is not None + + # Setup + controller = worker_controller.WorkerController() + + mp_manager = mp.Manager() + input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + + # Create worker process + worker = mp.Process( + target=auto_landing_worker.auto_landing_worker, + args=( + FOV_X, + FOV_Y, + IMAGE_HEIGHT, + IMAGE_WIDTH, + WORKER_PERIOD, + DETECTION_STRATEGY, + input_queue, + output_queue, + command_queue, + controller, + ), + ) + + # Start worker + worker.start() + + # Give worker time to initialize + time.sleep(0.5) + + # Test 1: Worker should not process detections when disabled (default state) + detection1 = create_test_detection([200, 200, 400, 400], 1, 0.9) + simulate_detection_input( + input_queue, + [detection1], + (0.0, 0.0, -50.0), # 50 meters above ground + (0.0, 0.0, 0.0), + ) + + time.sleep(0.2) + + # Should have no output since auto-landing is disabled by default + assert output_queue.queue.empty() + + # Test 2: Enable auto-landing and verify it processes detections + enable_command = auto_landing_worker.AutoLandingCommand("enable") + command_queue.queue.put(enable_command) + + time.sleep(0.2) + + # Now send the same detection - should be processed + detection2 = create_test_detection([300, 300, 500, 500], 2, 0.85) + simulate_detection_input( + input_queue, + [detection2], + (10.0, 5.0, -75.0), # 75 meters above ground + (0.1, 0.0, 0.0), + ) + + time.sleep(0.2) + + # Should have output now + assert not output_queue.queue.empty() + landing_info = output_queue.queue.get_nowait() + assert landing_info is not None + assert hasattr(landing_info, "angle_x") + assert hasattr(landing_info, "angle_y") + assert hasattr(landing_info, "target_dist") + + # Test 3: Test with multiple detections (should use NEAREST_TO_CENTER strategy) + detection3 = create_test_detection([100, 100, 200, 200], 1, 0.7) # Far from center + detection4 = create_test_detection([310, 310, 330, 330], 2, 0.8) # Close to center (320, 320) + + simulate_detection_input( + input_queue, + [detection3, detection4], + (0.0, 0.0, -100.0), # 100 meters above ground + (0.0, 0.0, 0.0), + ) + + time.sleep(0.2) + + # Should have output for the detection closest to center + assert not output_queue.queue.empty() + landing_info2 = output_queue.queue.get_nowait() + assert landing_info2 is not None + + # Test 4: Disable auto-landing and verify it stops processing + disable_command = auto_landing_worker.AutoLandingCommand("disable") + command_queue.queue.put(disable_command) + + time.sleep(0.2) + + # Send another detection - should not be processed + detection5 = create_test_detection([400, 400, 600, 600], 3, 0.95) + simulate_detection_input( + input_queue, + [detection5], + (0.0, 0.0, -60.0), + (0.0, 0.0, 0.0), + ) + + time.sleep(0.2) + + # Should have no new output + assert output_queue.queue.empty() + + # Test 5: Test invalid command handling + invalid_command = auto_landing_worker.AutoLandingCommand("invalid_command") + command_queue.queue.put(invalid_command) + + time.sleep(0.2) + + # Worker should continue running despite invalid command + assert worker.is_alive() + + # Test 6: Test with no detections (empty detection list should not crash) + # This should not create a MergedOdometryDetections object since it requires non-empty detections + # So we just verify the worker continues running + + # Cleanup + controller.request_exit() + + # Drain queues + input_queue.fill_and_drain_queue() + command_queue.fill_and_drain_queue() + + # Wait for worker to finish + worker.join(timeout=5.0) + if worker.is_alive(): + worker.terminate() + worker.join() + + return 0 + + +if __name__ == "__main__": + result_main = main() + if result_main < 0: + print(f"ERROR: Status code: {result_main}") + + print("Done!")