From ff49312ba11cbfae72b500645e0e4a3f78a036d1 Mon Sep 17 00:00:00 2001 From: danielquzhao <94013292+danielquzhao@users.noreply.github.com> Date: Tue, 15 Jul 2025 21:08:57 -0400 Subject: [PATCH 1/9] Complete auto landing module --- modules/auto_landing/auto_landing.py | 201 ++++- modules/auto_landing/auto_landing_worker.py | 124 ++- modules/common | 2 +- tests/integration/test_auto_landing_worker.py | 715 ++++++++++++++++++ 4 files changed, 1022 insertions(+), 20 deletions(-) create mode 100644 tests/integration/test_auto_landing_worker.py diff --git a/modules/auto_landing/auto_landing.py b/modules/auto_landing/auto_landing.py index a8841062..3b7ac5b1 100644 --- a/modules/auto_landing/auto_landing.py +++ b/modules/auto_landing/auto_landing.py @@ -4,11 +4,24 @@ """ import math +from enum import Enum +from typing import Optional +import threading from .. import merged_odometry_detections from ..common.modules.logger import logger +class DetectionSelectionStrategy(Enum): + """ + Strategies for selecting which detection to use when multiple targets are detected. + """ + NEAREST_TO_CENTER = "nearest_to_center" # Choose detection closest to image center + LARGEST_AREA = "largest_area" # Choose detection with largest bounding box area + HIGHEST_CONFIDENCE = "highest_confidence" # Choose detection with highest confidence + FIRST_DETECTION = "first_detection" # Use first detection in list (original behavior) + + class AutoLandingInformation: """ Information necessary for the LANDING_TARGET MAVLink command. @@ -44,17 +57,19 @@ def create( im_h: float, im_w: float, local_logger: logger.Logger, + selection_strategy: DetectionSelectionStrategy = DetectionSelectionStrategy.NEAREST_TO_CENTER, ) -> "tuple [bool, AutoLanding | None ]": """ fov_x: The horizontal camera field of view in degrees. fov_y: The vertical camera field of view in degrees. im_w: Width of image. im_h: Height of image. + selection_strategy: Strategy for selecting which detection to use when multiple targets are detected. Returns an AutoLanding object. """ - return True, AutoLanding(cls.__create_key, fov_x, fov_y, im_h, im_w, local_logger) + return True, AutoLanding(cls.__create_key, fov_x, fov_y, im_h, im_w, local_logger, selection_strategy) def __init__( self, @@ -64,6 +79,7 @@ def __init__( im_h: float, im_w: float, local_logger: logger.Logger, + selection_strategy: DetectionSelectionStrategy, ) -> None: """ Private constructor, use create() method. @@ -75,10 +91,73 @@ def __init__( self.im_h = im_h self.im_w = im_w self.__logger = local_logger + self.__selection_strategy = selection_strategy + + def _select_detection(self, detections: list) -> Optional[int]: + """ + Select which detection to use based on the configured strategy. + + Returns the index of the selected detection, or None if no suitable detection found. + """ + if not detections: + return None + + if len(detections) == 1: + return 0 + + if self.__selection_strategy == DetectionSelectionStrategy.FIRST_DETECTION: + return 0 + + elif self.__selection_strategy == DetectionSelectionStrategy.NEAREST_TO_CENTER: + # Find detection closest to image center + image_center_x = self.im_w / 2 + image_center_y = self.im_h / 2 + + min_distance = float('inf') + selected_index = 0 + + for i, detection in enumerate(detections): + det_center_x, det_center_y = detection.get_centre() + distance = math.sqrt((det_center_x - image_center_x)**2 + (det_center_y - image_center_y)**2) + if distance < min_distance: + min_distance = distance + selected_index = i + + return selected_index + + elif self.__selection_strategy == DetectionSelectionStrategy.LARGEST_AREA: + # Find detection with largest bounding box area + max_area = 0 + selected_index = 0 + + for i, detection in enumerate(detections): + width = detection.x_2 - detection.x_1 + height = detection.y_2 - detection.y_1 + area = width * height + if area > max_area: + max_area = area + selected_index = i + + return selected_index + + elif self.__selection_strategy == DetectionSelectionStrategy.HIGHEST_CONFIDENCE: + # Find detection with highest confidence + max_confidence = 0 + selected_index = 0 + + for i, detection in enumerate(detections): + if detection.confidence > max_confidence: + max_confidence = detection.confidence + selected_index = i + + return selected_index + + # Default fallback + return 0 def run( self, odometry_detections: merged_odometry_detections.MergedOdometryDetections - ) -> "tuple[bool, AutoLandingInformation]": + ) -> "tuple[bool, AutoLandingInformation | None]": """ Calculates the x and y angles in radians of the bounding box based on its center. @@ -86,9 +165,27 @@ def run( Returns an AutoLandingInformation object. """ - - # TODO: Devise better algorithm to pick which detection to land at if several are detected - x_center, y_center = odometry_detections.detections[0].get_centre() + + # Check if we have any detections + if not odometry_detections.detections: + self.__logger.warning("No detections available for auto-landing", True) + return False, None + + # Select which detection to use + selected_index = self._select_detection(odometry_detections.detections) + if selected_index is None: + self.__logger.error("Failed to select detection for auto-landing", True) + return False, None + + selected_detection = odometry_detections.detections[selected_index] + + # Log selection information + self.__logger.info( + f"Selected detection {selected_index + 1}/{len(odometry_detections.detections)} using strategy: {self.__selection_strategy.value}", + True, + ) + + x_center, y_center = selected_detection.get_centre() angle_x = (x_center - self.im_w / 2) * (self.fov_x * (math.pi / 180)) / self.im_w angle_y = (y_center - self.im_h / 2) * (self.fov_y * (math.pi / 180)) / self.im_h @@ -108,3 +205,97 @@ def run( ) return True, AutoLandingInformation(angle_x, angle_y, target_to_vehicle_dist) + + +class AutoLandingController: + """ + Simple controller for turning auto-landing on/off. + """ + + __create_key = object() + + @classmethod + def create( + cls, + auto_landing: AutoLanding, + local_logger: logger.Logger, + ) -> "tuple[bool, AutoLandingController | None]": + """ + Create an AutoLandingController instance. + + auto_landing: The AutoLanding instance to control. + local_logger: Logger instance for logging. + + Returns an AutoLandingController object. + """ + return True, AutoLandingController(cls.__create_key, auto_landing, local_logger) + + def __init__( + self, + class_private_create_key: object, + auto_landing: AutoLanding, + local_logger: logger.Logger, + ) -> None: + """ + Private constructor, use create() method. + """ + assert class_private_create_key is AutoLandingController.__create_key, "Use create() method" + + self.__auto_landing = auto_landing + self.__logger = local_logger + self.__enabled = False + self.__enabled_lock = threading.Lock() + + def is_enabled(self) -> bool: + """ + Check if auto-landing is enabled. + """ + with self.__enabled_lock: + return self.__enabled + + def enable(self) -> bool: + """ + Enable auto-landing system. + + Returns True if successfully enabled, False otherwise. + """ + with self.__enabled_lock: + if not self.__enabled: + self.__enabled = True + self.__logger.info("Auto-landing system enabled", True) + return True + else: + self.__logger.warning("Auto-landing system already enabled", True) + return False + + def disable(self) -> bool: + """ + Disable auto-landing system. + + Returns True if successfully disabled, False otherwise. + """ + with self.__enabled_lock: + if self.__enabled: + self.__enabled = False + self.__logger.info("Auto-landing system disabled", True) + return True + else: + self.__logger.warning("Auto-landing system already disabled", True) + return False + + def process_detections( + self, + odometry_detections: merged_odometry_detections.MergedOdometryDetections + ) -> "tuple[bool, AutoLandingInformation | None]": + """ + Process detections if auto-landing is enabled. + + Returns landing information if processing was successful, None otherwise. + """ + with self.__enabled_lock: + if not self.__enabled: + return False, None + + # Process the detections using the auto-landing module + result, landing_info = self.__auto_landing.run(odometry_detections) + return result, landing_info diff --git a/modules/auto_landing/auto_landing_worker.py b/modules/auto_landing/auto_landing_worker.py index 62890e04..6bc687c2 100644 --- a/modules/auto_landing/auto_landing_worker.py +++ b/modules/auto_landing/auto_landing_worker.py @@ -12,22 +12,39 @@ from ..common.modules.logger import logger +class AutoLandingCommand: + """ + Command structure for controlling auto-landing operations. + """ + def __init__(self, command: str): + self.command = command + + def auto_landing_worker( fov_x: float, fov_y: float, im_h: float, im_w: float, period: float, + detection_strategy: auto_landing.DetectionSelectionStrategy, input_queue: queue_proxy_wrapper.QueueProxyWrapper, output_queue: queue_proxy_wrapper.QueueProxyWrapper, + command_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, ) -> None: """ - Worker process. + Worker process for auto-landing operations. - period: Wait time in seconds. - input_queue and output_queue are data queues. - controller is how the main process communicates to this worker process. + fov_x: Horizontal field of view in degrees. + fov_y: Vertical field of view in degrees. + im_h: Image height in pixels. + im_w: Image width in pixels. + period: Wait time in seconds between processing cycles. + detection_strategy: Strategy for selecting detection when multiple targets are present. + input_queue: Queue for receiving merged odometry detections. + output_queue: Queue for sending auto-landing information. + command_queue: Queue for receiving enable/disable commands. + controller: Worker controller for pause/exit management. """ worker_name = pathlib.Path(__file__).stem @@ -42,25 +59,104 @@ def auto_landing_worker( local_logger.info("Logger initialized", True) - result, auto_lander = auto_landing.AutoLanding.create(fov_x, fov_y, im_h, im_w, local_logger) - + # Create auto-landing instance + result, auto_lander = auto_landing.AutoLanding.create( + fov_x, fov_y, im_h, im_w, local_logger, detection_strategy + ) if not result: - local_logger.error("Worker failed to create class object", True) + local_logger.error("Worker failed to create AutoLanding object", True) return # Get Pylance to stop complaining assert auto_lander is not None + # Create auto-landing controller + result, landing_controller = auto_landing.AutoLandingController.create( + auto_lander, local_logger + ) + if not result: + local_logger.error("Worker failed to create AutoLandingController object", True) + return + + # Get Pylance to stop complaining + assert landing_controller is not None + + local_logger.info("Auto-landing worker initialized successfully", True) + while not controller.is_exit_requested(): controller.check_pause() - input_data = input_queue.queue.get() - if input_data is None: - continue + # Process commands first + _process_commands(command_queue, landing_controller, local_logger) + + # Process detections if available + input_data = None + try: + input_data = input_queue.queue.get_nowait() + except: + # No data available, continue + pass - result, value = auto_lander.run(input_data) - if not result: - continue + if input_data is not None: + result, landing_info = landing_controller.process_detections(input_data) + if result and landing_info: + output_queue.queue.put(landing_info) - output_queue.queue.put(value) time.sleep(period) + + +def _process_commands( + command_queue: queue_proxy_wrapper.QueueProxyWrapper, + landing_controller: auto_landing.AutoLandingController, + local_logger: logger.Logger, +) -> None: + """ + Process all available commands in the command queue. + + command_queue: Queue containing AutoLandingCommand objects. + landing_controller: Controller instance to execute commands on. + local_logger: Logger for command processing. + """ + while True: + try: + command = command_queue.queue.get_nowait() + if command is None: + break + + if isinstance(command, AutoLandingCommand): + _execute_command(command, landing_controller, local_logger) + else: + local_logger.warning(f"Received invalid command type: {type(command)}", True) + + except: + # No more commands available + break + + +def _execute_command( + command: AutoLandingCommand, + landing_controller: auto_landing.AutoLandingController, + local_logger: logger.Logger, +) -> None: + """ + Execute an auto-landing command. + + command: Command to execute. + landing_controller: Controller instance to execute command on. + local_logger: Logger for command execution. + """ + local_logger.info(f"Executing command: {command.command}", True) + + success = False + if command.command == "enable": + success = landing_controller.enable() + elif command.command == "disable": + success = landing_controller.disable() + else: + local_logger.error(f"Unknown command: {command.command}. Only 'enable' and 'disable' are supported.", True) + return + + if success: + local_logger.info(f"Command '{command.command}' executed successfully", True) + else: + local_logger.error(f"Command '{command.command}' failed to execute", True) diff --git a/modules/common b/modules/common index a48579ef..c0ffaa5c 160000 --- a/modules/common +++ b/modules/common @@ -1 +1 @@ -Subproject commit a48579ef93b187e0a6b72ca941108866e270f089 +Subproject commit c0ffaa5cb4bc55a1eeb0f02d4418eaab10c69b81 diff --git a/tests/integration/test_auto_landing_worker.py b/tests/integration/test_auto_landing_worker.py new file mode 100644 index 00000000..41647e6d --- /dev/null +++ b/tests/integration/test_auto_landing_worker.py @@ -0,0 +1,715 @@ +#!/usr/bin/env python3 +""" +Integration test for auto_landing_worker. +Tests the worker process, queue operations, command processing, and lifecycle management. +""" + +import multiprocessing as mp +import time +import queue +import pathlib +import sys +import os +import unittest.mock + +# Add project root to path +sys.path.insert(0, str(pathlib.Path(__file__).parent.parent.parent)) + +from modules.auto_landing import auto_landing_worker +from modules.auto_landing import auto_landing +from modules import merged_odometry_detections +from modules import detections_and_time +from modules.common.modules import orientation +from modules.common.modules import position_local +from modules.common.modules.mavlink import drone_odometry_local +from utilities.workers import queue_proxy_wrapper +from utilities.workers import worker_controller +import numpy as np + + +class MockLogger: + """Mock logger that doesn't write to files.""" + + def __init__(self, name): + self.name = name + + def info(self, message, print_to_console=False): + if print_to_console: + pass # Suppress log output during testing + + def warning(self, message, print_to_console=False): + if print_to_console: + pass # Suppress log output during testing + + def error(self, message, print_to_console=False): + if print_to_console: + pass # Suppress log output during testing + + +def mock_logger_create(name, enable_file_logging): + """Mock logger create function.""" + return True, MockLogger(name) + + +class MockDetection: + """Mock detection for testing.""" + + def __init__(self, x1, y1, x2, y2, confidence=0.9): + self.x_1 = x1 + self.y_1 = y1 + self.x_2 = x2 + self.y_2 = y2 + self.confidence = confidence + + def get_centre(self): + """Return center coordinates.""" + center_x = (self.x_1 + self.x_2) / 2 + center_y = (self.y_1 + self.y_2) / 2 + return center_x, center_y + + +def create_mock_merged_detections(detections_list, down_position=-10.0): + """Create mock merged detections for testing.""" + # Create drone position + result, drone_position = position_local.PositionLocal.create( + 0.0, 0.0, down_position + ) + if not result: + raise RuntimeError("Failed to create drone position") + assert drone_position is not None + + # Create drone orientation + result, drone_orientation = orientation.Orientation.create(0.0, 0.0, 0.0) + if not result: + raise RuntimeError("Failed to create drone orientation") + assert drone_orientation is not None + + # Create drone odometry + result, drone_odometry = drone_odometry_local.DroneOdometryLocal.create( + drone_position, drone_orientation + ) + if not result: + raise RuntimeError("Failed to create drone odometry") + assert drone_odometry is not None + + # Create detections + mock_detections = [] + for det_data in detections_list: + mock_det = MockDetection(*det_data) + mock_detections.append(mock_det) + + # Create merged detections + result, merged = merged_odometry_detections.MergedOdometryDetections.create( + drone_odometry, mock_detections + ) + if not result: + raise RuntimeError(f"Failed to create merged detections with {len(mock_detections)} detections") + assert merged is not None + + return merged + + +def test_basic_worker_functionality(): + """Test basic worker functionality - processing detections.""" + print("=== Testing Basic Worker Functionality ===") + + # Setup + controller = worker_controller.WorkerController() + mp_manager = mp.Manager() + + input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + + # Worker parameters + fov_x = 60.0 + fov_y = 45.0 + im_h = 480 + im_w = 640 + period = 0.1 + detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER + + # Start worker + worker = mp.Process( + target=auto_landing_worker.auto_landing_worker, + args=( + fov_x, + fov_y, + im_h, + im_w, + period, + detection_strategy, + input_queue, + output_queue, + command_queue, + controller, + ), + ) + + print("Starting worker...") + worker.start() + + # Give worker time to initialize + time.sleep(0.5) + + # Enable auto-landing + enable_command = auto_landing_worker.AutoLandingCommand("enable") + command_queue.queue.put(enable_command) + + # Wait for command to be processed + time.sleep(0.2) + + # Send test detection + detections = [(100, 100, 200, 200, 0.9)] # x1, y1, x2, y2, confidence + mock_merged = create_mock_merged_detections(detections) + input_queue.queue.put(mock_merged) + + # Wait for processing + time.sleep(0.5) + + # Check output + try: + landing_info = output_queue.queue.get_nowait() + assert landing_info is not None + assert hasattr(landing_info, 'angle_x') + assert hasattr(landing_info, 'angle_y') + assert hasattr(landing_info, 'target_dist') + print(f"✅ Worker processed detection successfully") + print(f" Angle X: {landing_info.angle_x:.4f}") + print(f" Angle Y: {landing_info.angle_y:.4f}") + print(f" Target Distance: {landing_info.target_dist:.4f}") + except queue.Empty: + print("❌ No output received from worker") + assert False, "Worker should have produced output" + + # Cleanup + controller.request_exit() + input_queue.fill_and_drain_queue() + output_queue.fill_and_drain_queue() + command_queue.fill_and_drain_queue() + worker.join() + + print("✅ Basic worker functionality test passed") + return True + + +def test_worker_commands(): + """Test worker command processing (enable/disable).""" + print("\n=== Testing Worker Commands ===") + + # Setup + controller = worker_controller.WorkerController() + mp_manager = mp.Manager() + + input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + + # Worker parameters + fov_x = 60.0 + fov_y = 45.0 + im_h = 480 + im_w = 640 + period = 0.1 + detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER + + # Start worker + worker = mp.Process( + target=auto_landing_worker.auto_landing_worker, + args=( + fov_x, + fov_y, + im_h, + im_w, + period, + detection_strategy, + input_queue, + output_queue, + command_queue, + controller, + ), + ) + + print("Starting worker...") + worker.start() + + # Give worker time to initialize + time.sleep(0.5) + + # Test disabled state (default) + print("Testing disabled state...") + detections = [(100, 100, 200, 200, 0.9)] + mock_merged = create_mock_merged_detections(detections) + input_queue.queue.put(mock_merged) + + time.sleep(0.3) + + # Should not produce output when disabled + try: + output_queue.queue.get_nowait() + print("❌ Worker should not process detections when disabled") + assert False, "Worker should not process detections when disabled" + except queue.Empty: + print("✅ Worker correctly ignores detections when disabled") + + # Test enable command + print("Testing enable command...") + enable_command = auto_landing_worker.AutoLandingCommand("enable") + command_queue.queue.put(enable_command) + + time.sleep(0.2) + + # Send detection after enabling + input_queue.queue.put(mock_merged) + + time.sleep(0.3) + + # Should produce output when enabled + try: + landing_info = output_queue.queue.get_nowait() + assert landing_info is not None + print("✅ Worker processes detections when enabled") + except queue.Empty: + print("❌ Worker should process detections when enabled") + assert False, "Worker should process detections when enabled" + + # Test disable command + print("Testing disable command...") + disable_command = auto_landing_worker.AutoLandingCommand("disable") + command_queue.queue.put(disable_command) + + time.sleep(0.2) + + # Send detection after disabling + input_queue.queue.put(mock_merged) + + time.sleep(0.3) + + # Should not produce output when disabled + try: + output_queue.queue.get_nowait() + print("❌ Worker should not process detections after disable") + assert False, "Worker should not process detections after disable" + except queue.Empty: + print("✅ Worker correctly stops processing after disable") + + # Cleanup + controller.request_exit() + input_queue.fill_and_drain_queue() + output_queue.fill_and_drain_queue() + command_queue.fill_and_drain_queue() + worker.join() + + print("✅ Worker commands test passed") + return True + + +def test_worker_no_detections(): + """Test worker behavior with no detections.""" + print("\n=== Testing Worker with No Detections ===") + + try: + # Setup + controller = worker_controller.WorkerController() + mp_manager = mp.Manager() + + input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + + # Worker parameters + fov_x = 60.0 + fov_y = 45.0 + im_h = 480 + im_w = 640 + period = 0.1 + detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER + + # Start worker + worker = mp.Process( + target=auto_landing_worker.auto_landing_worker, + args=( + fov_x, + fov_y, + im_h, + im_w, + period, + detection_strategy, + input_queue, + output_queue, + command_queue, + controller, + ), + ) + + print("Starting worker...") + worker.start() + + # Give worker time to initialize + time.sleep(0.5) + + # Verify worker is alive + if not worker.is_alive(): + print("❌ Worker failed to start") + return False + + # Enable auto-landing + enable_command = auto_landing_worker.AutoLandingCommand("enable") + command_queue.queue.put(enable_command) + + time.sleep(0.2) + + # Don't send any detections (this simulates the real scenario where + # the data merge worker doesn't produce MergedOdometryDetections when there are no detections) + print("Not sending any detections to worker (simulating no detections scenario)...") + + print("Waiting to ensure worker doesn't produce output...") + time.sleep(0.3) + + # Should not produce output when no data is sent + print("Checking if worker produced output...") + try: + result = output_queue.queue.get_nowait() + print(f"❌ Worker should not produce output when no data is sent, but got: {result}") + return False + except queue.Empty: + print("✅ Worker correctly handles no input data") + + # Cleanup + controller.request_exit() + input_queue.fill_and_drain_queue() + output_queue.fill_and_drain_queue() + command_queue.fill_and_drain_queue() + worker.join() + + print("✅ No detections test passed") + return True + + except Exception as e: + print(f"❌ Test failed with exception: {e}") + # Try to cleanup if possible + try: + controller.request_exit() + worker.join(timeout=1.0) + except: + pass + return False + + +def test_worker_multiple_detections(): + """Test worker with multiple detections and different strategies.""" + print("\n=== Testing Worker with Multiple Detections ===") + + strategies = [ + auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER, + auto_landing.DetectionSelectionStrategy.LARGEST_AREA, + auto_landing.DetectionSelectionStrategy.HIGHEST_CONFIDENCE, + ] + + for strategy in strategies: + print(f"Testing strategy: {strategy.value}") + + # Setup + controller = worker_controller.WorkerController() + mp_manager = mp.Manager() + + input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + + # Worker parameters + fov_x = 60.0 + fov_y = 45.0 + im_h = 480 + im_w = 640 + period = 0.1 + + # Start worker + worker = mp.Process( + target=auto_landing_worker.auto_landing_worker, + args=( + fov_x, + fov_y, + im_h, + im_w, + period, + strategy, + input_queue, + output_queue, + command_queue, + controller, + ), + ) + + worker.start() + time.sleep(0.5) + + # Enable auto-landing + enable_command = auto_landing_worker.AutoLandingCommand("enable") + command_queue.queue.put(enable_command) + time.sleep(0.2) + + # Send multiple detections + detections = [ + (50, 50, 100, 100, 0.7), # Top-left, lower confidence + (300, 200, 400, 300, 0.9), # Center-right, high confidence + (200, 180, 300, 280, 0.8), # Near center, medium confidence + ] + mock_merged = create_mock_merged_detections(detections) + input_queue.queue.put(mock_merged) + + time.sleep(0.5) + + # Should produce output + try: + landing_info = output_queue.queue.get_nowait() + assert landing_info is not None + print(f"✅ Strategy {strategy.value} produced output") + except queue.Empty: + print(f"❌ Strategy {strategy.value} failed to produce output") + assert False, f"Strategy {strategy.value} should produce output" + + # Cleanup + controller.request_exit() + input_queue.fill_and_drain_queue() + output_queue.fill_and_drain_queue() + command_queue.fill_and_drain_queue() + worker.join() + + print("✅ Multiple detections test passed") + return True + + +def test_worker_lifecycle(): + """Test worker lifecycle management (pause, resume, exit).""" + print("\n=== Testing Worker Lifecycle ===") + + # Setup + controller = worker_controller.WorkerController() + mp_manager = mp.Manager() + + input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + + # Worker parameters + fov_x = 60.0 + fov_y = 45.0 + im_h = 480 + im_w = 640 + period = 0.1 + detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER + + # Start worker + worker = mp.Process( + target=auto_landing_worker.auto_landing_worker, + args=( + fov_x, + fov_y, + im_h, + im_w, + period, + detection_strategy, + input_queue, + output_queue, + command_queue, + controller, + ), + ) + + print("Starting worker...") + worker.start() + + # Give worker time to initialize + time.sleep(0.5) + + # Verify worker is alive + assert worker.is_alive(), "Worker should be alive after start" + print("✅ Worker started successfully") + + # Test pause + print("Testing pause...") + controller.request_pause() + time.sleep(0.2) + + # Worker should still be alive but paused + assert worker.is_alive(), "Worker should still be alive when paused" + print("✅ Worker paused successfully") + + # Test resume + print("Testing resume...") + controller.request_resume() + time.sleep(0.2) + + # Worker should still be alive and resumed + assert worker.is_alive(), "Worker should still be alive after resume" + print("✅ Worker resumed successfully") + + # Test exit + print("Testing exit...") + controller.request_exit() + + # Give worker time to exit gracefully + worker.join(timeout=2.0) + + # Worker should have exited + assert not worker.is_alive(), "Worker should have exited" + print("✅ Worker exited successfully") + + # Cleanup + input_queue.fill_and_drain_queue() + output_queue.fill_and_drain_queue() + command_queue.fill_and_drain_queue() + + print("✅ Worker lifecycle test passed") + return True + + +def test_worker_invalid_commands(): + """Test worker behavior with invalid commands.""" + print("\n=== Testing Worker with Invalid Commands ===") + + # Setup + controller = worker_controller.WorkerController() + mp_manager = mp.Manager() + + input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + + # Worker parameters + fov_x = 60.0 + fov_y = 45.0 + im_h = 480 + im_w = 640 + period = 0.1 + detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER + + # Start worker + worker = mp.Process( + target=auto_landing_worker.auto_landing_worker, + args=( + fov_x, + fov_y, + im_h, + im_w, + period, + detection_strategy, + input_queue, + output_queue, + command_queue, + controller, + ), + ) + + print("Starting worker...") + worker.start() + + # Give worker time to initialize + time.sleep(0.5) + + # Test invalid command + print("Testing invalid command...") + invalid_command = auto_landing_worker.AutoLandingCommand("invalid_command") + command_queue.queue.put(invalid_command) + + time.sleep(0.3) + + # Worker should still be alive after invalid command + assert worker.is_alive(), "Worker should still be alive after invalid command" + print("✅ Worker handled invalid command gracefully") + + # Test invalid command type + print("Testing invalid command type...") + command_queue.queue.put("not_a_command_object") + + time.sleep(0.3) + + # Worker should still be alive after invalid command type + assert worker.is_alive(), "Worker should still be alive after invalid command type" + print("✅ Worker handled invalid command type gracefully") + + # Cleanup + controller.request_exit() + input_queue.fill_and_drain_queue() + output_queue.fill_and_drain_queue() + command_queue.fill_and_drain_queue() + worker.join() + + print("✅ Invalid commands test passed") + return True + + +def main(): + """Run all tests.""" + print("Starting Auto Landing Worker Integration Tests...") + + # Create logs directory structure required by logger + logs_dir = pathlib.Path("logs") + logs_dir.mkdir(exist_ok=True) + + # Create timestamp-based subdirectory (logger expects this) + import datetime + timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + session_dir = logs_dir / timestamp + session_dir.mkdir(exist_ok=True) + + # Clean up any existing log files from previous test runs + for log_file in session_dir.glob("*auto_landing_worker*"): + try: + log_file.unlink() + except: + pass # Ignore errors if file doesn't exist or is locked + + tests = [ + test_basic_worker_functionality, + test_worker_commands, + test_worker_no_detections, + test_worker_multiple_detections, + test_worker_lifecycle, + test_worker_invalid_commands, + ] + + passed = 0 + failed = 0 + + for test in tests: + try: + if test(): + passed += 1 + else: + failed += 1 + except Exception as e: + print(f"❌ Test {test.__name__} failed with exception: {e}") + failed += 1 + + # Clean up log files after tests + for log_file in session_dir.glob("*auto_landing_worker*"): + try: + log_file.unlink() + except: + pass # Ignore errors if file doesn't exist or is locked + + # Clean up session directory if empty + try: + session_dir.rmdir() + except: + pass # Ignore if directory is not empty or doesn't exist + + print("\n" + "=" * 50) + print("TEST RESULTS") + print("=" * 50) + print(f"Passed: {passed}") + print(f"Failed: {failed}") + print(f"Total: {passed + failed}") + + if failed == 0: + print("🎉 All tests passed!") + return 0 + else: + print("❌ Some tests failed!") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) From 71511763cff26738daa8d65b4416f829ad04b370 Mon Sep 17 00:00:00 2001 From: danielquzhao <94013292+danielquzhao@users.noreply.github.com> Date: Tue, 15 Jul 2025 22:29:07 -0400 Subject: [PATCH 2/9] pass black and flake8 --- modules/auto_landing/auto_landing.py | 80 ++--- modules/auto_landing/auto_landing_worker.py | 19 +- tests/integration/test_auto_landing_worker.py | 273 +++++++++--------- 3 files changed, 190 insertions(+), 182 deletions(-) diff --git a/modules/auto_landing/auto_landing.py b/modules/auto_landing/auto_landing.py index 3b7ac5b1..1ce05c7d 100644 --- a/modules/auto_landing/auto_landing.py +++ b/modules/auto_landing/auto_landing.py @@ -16,6 +16,7 @@ class DetectionSelectionStrategy(Enum): """ Strategies for selecting which detection to use when multiple targets are detected. """ + NEAREST_TO_CENTER = "nearest_to_center" # Choose detection closest to image center LARGEST_AREA = "largest_area" # Choose detection with largest bounding box area HIGHEST_CONFIDENCE = "highest_confidence" # Choose detection with highest confidence @@ -69,7 +70,9 @@ def create( Returns an AutoLanding object. """ - return True, AutoLanding(cls.__create_key, fov_x, fov_y, im_h, im_w, local_logger, selection_strategy) + return True, AutoLanding( + cls.__create_key, fov_x, fov_y, im_h, im_w, local_logger, selection_strategy + ) def __init__( self, @@ -96,40 +99,42 @@ def __init__( def _select_detection(self, detections: list) -> Optional[int]: """ Select which detection to use based on the configured strategy. - + Returns the index of the selected detection, or None if no suitable detection found. """ if not detections: return None - + if len(detections) == 1: return 0 - + if self.__selection_strategy == DetectionSelectionStrategy.FIRST_DETECTION: return 0 - + elif self.__selection_strategy == DetectionSelectionStrategy.NEAREST_TO_CENTER: # Find detection closest to image center image_center_x = self.im_w / 2 image_center_y = self.im_h / 2 - - min_distance = float('inf') + + min_distance = float("inf") selected_index = 0 - + for i, detection in enumerate(detections): det_center_x, det_center_y = detection.get_centre() - distance = math.sqrt((det_center_x - image_center_x)**2 + (det_center_y - image_center_y)**2) + distance = math.sqrt( + (det_center_x - image_center_x) ** 2 + (det_center_y - image_center_y) ** 2 + ) if distance < min_distance: min_distance = distance selected_index = i - + return selected_index - + elif self.__selection_strategy == DetectionSelectionStrategy.LARGEST_AREA: # Find detection with largest bounding box area max_area = 0 selected_index = 0 - + for i, detection in enumerate(detections): width = detection.x_2 - detection.x_1 height = detection.y_2 - detection.y_1 @@ -137,21 +142,21 @@ def _select_detection(self, detections: list) -> Optional[int]: if area > max_area: max_area = area selected_index = i - + return selected_index - + elif self.__selection_strategy == DetectionSelectionStrategy.HIGHEST_CONFIDENCE: # Find detection with highest confidence max_confidence = 0 selected_index = 0 - + for i, detection in enumerate(detections): if detection.confidence > max_confidence: max_confidence = detection.confidence selected_index = i - + return selected_index - + # Default fallback return 0 @@ -165,26 +170,26 @@ def run( Returns an AutoLandingInformation object. """ - + # Check if we have any detections if not odometry_detections.detections: self.__logger.warning("No detections available for auto-landing", True) return False, None - + # Select which detection to use selected_index = self._select_detection(odometry_detections.detections) if selected_index is None: self.__logger.error("Failed to select detection for auto-landing", True) return False, None - + selected_detection = odometry_detections.detections[selected_index] - + # Log selection information self.__logger.info( f"Selected detection {selected_index + 1}/{len(odometry_detections.detections)} using strategy: {self.__selection_strategy.value}", True, ) - + x_center, y_center = selected_detection.get_centre() angle_x = (x_center - self.im_w / 2) * (self.fov_x * (math.pi / 180)) / self.im_w @@ -211,9 +216,9 @@ class AutoLandingController: """ Simple controller for turning auto-landing on/off. """ - + __create_key = object() - + @classmethod def create( cls, @@ -222,14 +227,14 @@ def create( ) -> "tuple[bool, AutoLandingController | None]": """ Create an AutoLandingController instance. - + auto_landing: The AutoLanding instance to control. local_logger: Logger instance for logging. - + Returns an AutoLandingController object. """ return True, AutoLandingController(cls.__create_key, auto_landing, local_logger) - + def __init__( self, class_private_create_key: object, @@ -240,23 +245,23 @@ def __init__( Private constructor, use create() method. """ assert class_private_create_key is AutoLandingController.__create_key, "Use create() method" - + self.__auto_landing = auto_landing self.__logger = local_logger self.__enabled = False self.__enabled_lock = threading.Lock() - + def is_enabled(self) -> bool: """ Check if auto-landing is enabled. """ with self.__enabled_lock: return self.__enabled - + def enable(self) -> bool: """ Enable auto-landing system. - + Returns True if successfully enabled, False otherwise. """ with self.__enabled_lock: @@ -267,11 +272,11 @@ def enable(self) -> bool: else: self.__logger.warning("Auto-landing system already enabled", True) return False - + def disable(self) -> bool: """ Disable auto-landing system. - + Returns True if successfully disabled, False otherwise. """ with self.__enabled_lock: @@ -282,20 +287,19 @@ def disable(self) -> bool: else: self.__logger.warning("Auto-landing system already disabled", True) return False - + def process_detections( - self, - odometry_detections: merged_odometry_detections.MergedOdometryDetections + self, odometry_detections: merged_odometry_detections.MergedOdometryDetections ) -> "tuple[bool, AutoLandingInformation | None]": """ Process detections if auto-landing is enabled. - + Returns landing information if processing was successful, None otherwise. """ with self.__enabled_lock: if not self.__enabled: return False, None - + # Process the detections using the auto-landing module result, landing_info = self.__auto_landing.run(odometry_detections) return result, landing_info diff --git a/modules/auto_landing/auto_landing_worker.py b/modules/auto_landing/auto_landing_worker.py index 6bc687c2..f479c630 100644 --- a/modules/auto_landing/auto_landing_worker.py +++ b/modules/auto_landing/auto_landing_worker.py @@ -16,7 +16,8 @@ class AutoLandingCommand: """ Command structure for controlling auto-landing operations. """ - def __init__(self, command: str): + + def __init__(self, command: str) -> None: self.command = command @@ -112,7 +113,7 @@ def _process_commands( ) -> None: """ Process all available commands in the command queue. - + command_queue: Queue containing AutoLandingCommand objects. landing_controller: Controller instance to execute commands on. local_logger: Logger for command processing. @@ -122,12 +123,12 @@ def _process_commands( command = command_queue.queue.get_nowait() if command is None: break - + if isinstance(command, AutoLandingCommand): _execute_command(command, landing_controller, local_logger) else: local_logger.warning(f"Received invalid command type: {type(command)}", True) - + except: # No more commands available break @@ -140,22 +141,24 @@ def _execute_command( ) -> None: """ Execute an auto-landing command. - + command: Command to execute. landing_controller: Controller instance to execute command on. local_logger: Logger for command execution. """ local_logger.info(f"Executing command: {command.command}", True) - + success = False if command.command == "enable": success = landing_controller.enable() elif command.command == "disable": success = landing_controller.disable() else: - local_logger.error(f"Unknown command: {command.command}. Only 'enable' and 'disable' are supported.", True) + local_logger.error( + f"Unknown command: {command.command}. Only 'enable' and 'disable' are supported.", True + ) return - + if success: local_logger.info(f"Command '{command.command}' executed successfully", True) else: diff --git a/tests/integration/test_auto_landing_worker.py b/tests/integration/test_auto_landing_worker.py index 41647e6d..b9bb59bb 100644 --- a/tests/integration/test_auto_landing_worker.py +++ b/tests/integration/test_auto_landing_worker.py @@ -29,61 +29,59 @@ class MockLogger: """Mock logger that doesn't write to files.""" - - def __init__(self, name): + + def __init__(self, name: str) -> None: self.name = name - - def info(self, message, print_to_console=False): + + def info(self, message: str, print_to_console: bool = False) -> None: if print_to_console: pass # Suppress log output during testing - - def warning(self, message, print_to_console=False): + + def warning(self, message: str, print_to_console: bool = False) -> None: if print_to_console: pass # Suppress log output during testing - - def error(self, message, print_to_console=False): + + def error(self, message: str, print_to_console: bool = False) -> None: if print_to_console: pass # Suppress log output during testing -def mock_logger_create(name, enable_file_logging): +def mock_logger_create(name: str, enable_file_logging: bool) -> tuple[bool, MockLogger]: """Mock logger create function.""" return True, MockLogger(name) class MockDetection: """Mock detection for testing.""" - - def __init__(self, x1, y1, x2, y2, confidence=0.9): + + def __init__(self, x1: float, y1: float, x2: float, y2: float, confidence: float = 0.9) -> None: self.x_1 = x1 self.y_1 = y1 self.x_2 = x2 self.y_2 = y2 self.confidence = confidence - - def get_centre(self): + + def get_centre(self) -> tuple[float, float]: """Return center coordinates.""" center_x = (self.x_1 + self.x_2) / 2 center_y = (self.y_1 + self.y_2) / 2 return center_x, center_y -def create_mock_merged_detections(detections_list, down_position=-10.0): +def create_mock_merged_detections(detections_list: list[tuple[float, float, float, float, float]], down_position: float = -10.0) -> merged_odometry_detections.MergedOdometryDetections: """Create mock merged detections for testing.""" # Create drone position - result, drone_position = position_local.PositionLocal.create( - 0.0, 0.0, down_position - ) + result, drone_position = position_local.PositionLocal.create(0.0, 0.0, down_position) if not result: raise RuntimeError("Failed to create drone position") assert drone_position is not None - + # Create drone orientation result, drone_orientation = orientation.Orientation.create(0.0, 0.0, 0.0) if not result: raise RuntimeError("Failed to create drone orientation") assert drone_orientation is not None - + # Create drone odometry result, drone_odometry = drone_odometry_local.DroneOdometryLocal.create( drone_position, drone_orientation @@ -91,36 +89,38 @@ def create_mock_merged_detections(detections_list, down_position=-10.0): if not result: raise RuntimeError("Failed to create drone odometry") assert drone_odometry is not None - + # Create detections mock_detections = [] for det_data in detections_list: mock_det = MockDetection(*det_data) mock_detections.append(mock_det) - + # Create merged detections result, merged = merged_odometry_detections.MergedOdometryDetections.create( drone_odometry, mock_detections ) if not result: - raise RuntimeError(f"Failed to create merged detections with {len(mock_detections)} detections") + raise RuntimeError( + f"Failed to create merged detections with {len(mock_detections)} detections" + ) assert merged is not None - + return merged -def test_basic_worker_functionality(): +def test_basic_worker_functionality() -> bool: """Test basic worker functionality - processing detections.""" print("=== Testing Basic Worker Functionality ===") - + # Setup controller = worker_controller.WorkerController() mp_manager = mp.Manager() - + input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - + # Worker parameters fov_x = 60.0 fov_y = 45.0 @@ -128,7 +128,7 @@ def test_basic_worker_functionality(): im_w = 640 period = 0.1 detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER - + # Start worker worker = mp.Process( target=auto_landing_worker.auto_landing_worker, @@ -145,35 +145,35 @@ def test_basic_worker_functionality(): controller, ), ) - + print("Starting worker...") worker.start() - + # Give worker time to initialize time.sleep(0.5) - + # Enable auto-landing enable_command = auto_landing_worker.AutoLandingCommand("enable") command_queue.queue.put(enable_command) - + # Wait for command to be processed time.sleep(0.2) - + # Send test detection detections = [(100, 100, 200, 200, 0.9)] # x1, y1, x2, y2, confidence mock_merged = create_mock_merged_detections(detections) input_queue.queue.put(mock_merged) - + # Wait for processing time.sleep(0.5) - + # Check output try: landing_info = output_queue.queue.get_nowait() assert landing_info is not None - assert hasattr(landing_info, 'angle_x') - assert hasattr(landing_info, 'angle_y') - assert hasattr(landing_info, 'target_dist') + assert hasattr(landing_info, "angle_x") + assert hasattr(landing_info, "angle_y") + assert hasattr(landing_info, "target_dist") print(f"✅ Worker processed detection successfully") print(f" Angle X: {landing_info.angle_x:.4f}") print(f" Angle Y: {landing_info.angle_y:.4f}") @@ -181,30 +181,30 @@ def test_basic_worker_functionality(): except queue.Empty: print("❌ No output received from worker") assert False, "Worker should have produced output" - + # Cleanup controller.request_exit() input_queue.fill_and_drain_queue() output_queue.fill_and_drain_queue() command_queue.fill_and_drain_queue() worker.join() - + print("✅ Basic worker functionality test passed") return True -def test_worker_commands(): +def test_worker_commands() -> bool: """Test worker command processing (enable/disable).""" print("\n=== Testing Worker Commands ===") - + # Setup controller = worker_controller.WorkerController() mp_manager = mp.Manager() - + input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - + # Worker parameters fov_x = 60.0 fov_y = 45.0 @@ -212,7 +212,7 @@ def test_worker_commands(): im_w = 640 period = 0.1 detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER - + # Start worker worker = mp.Process( target=auto_landing_worker.auto_landing_worker, @@ -229,21 +229,21 @@ def test_worker_commands(): controller, ), ) - + print("Starting worker...") worker.start() - + # Give worker time to initialize time.sleep(0.5) - + # Test disabled state (default) print("Testing disabled state...") detections = [(100, 100, 200, 200, 0.9)] mock_merged = create_mock_merged_detections(detections) input_queue.queue.put(mock_merged) - + time.sleep(0.3) - + # Should not produce output when disabled try: output_queue.queue.get_nowait() @@ -251,19 +251,19 @@ def test_worker_commands(): assert False, "Worker should not process detections when disabled" except queue.Empty: print("✅ Worker correctly ignores detections when disabled") - + # Test enable command print("Testing enable command...") enable_command = auto_landing_worker.AutoLandingCommand("enable") command_queue.queue.put(enable_command) - + time.sleep(0.2) - + # Send detection after enabling input_queue.queue.put(mock_merged) - + time.sleep(0.3) - + # Should produce output when enabled try: landing_info = output_queue.queue.get_nowait() @@ -272,19 +272,19 @@ def test_worker_commands(): except queue.Empty: print("❌ Worker should process detections when enabled") assert False, "Worker should process detections when enabled" - + # Test disable command print("Testing disable command...") disable_command = auto_landing_worker.AutoLandingCommand("disable") command_queue.queue.put(disable_command) - + time.sleep(0.2) - + # Send detection after disabling input_queue.queue.put(mock_merged) - + time.sleep(0.3) - + # Should not produce output when disabled try: output_queue.queue.get_nowait() @@ -292,31 +292,31 @@ def test_worker_commands(): assert False, "Worker should not process detections after disable" except queue.Empty: print("✅ Worker correctly stops processing after disable") - + # Cleanup controller.request_exit() input_queue.fill_and_drain_queue() output_queue.fill_and_drain_queue() command_queue.fill_and_drain_queue() worker.join() - + print("✅ Worker commands test passed") return True -def test_worker_no_detections(): +def test_worker_no_detections() -> bool: """Test worker behavior with no detections.""" print("\n=== Testing Worker with No Detections ===") - + try: # Setup controller = worker_controller.WorkerController() mp_manager = mp.Manager() - + input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - + # Worker parameters fov_x = 60.0 fov_y = 45.0 @@ -324,7 +324,7 @@ def test_worker_no_detections(): im_w = 640 period = 0.1 detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER - + # Start worker worker = mp.Process( target=auto_landing_worker.auto_landing_worker, @@ -341,31 +341,31 @@ def test_worker_no_detections(): controller, ), ) - + print("Starting worker...") worker.start() - + # Give worker time to initialize time.sleep(0.5) - + # Verify worker is alive if not worker.is_alive(): print("❌ Worker failed to start") return False - + # Enable auto-landing enable_command = auto_landing_worker.AutoLandingCommand("enable") command_queue.queue.put(enable_command) - + time.sleep(0.2) - + # Don't send any detections (this simulates the real scenario where # the data merge worker doesn't produce MergedOdometryDetections when there are no detections) print("Not sending any detections to worker (simulating no detections scenario)...") - + print("Waiting to ensure worker doesn't produce output...") time.sleep(0.3) - + # Should not produce output when no data is sent print("Checking if worker produced output...") try: @@ -374,17 +374,17 @@ def test_worker_no_detections(): return False except queue.Empty: print("✅ Worker correctly handles no input data") - + # Cleanup controller.request_exit() input_queue.fill_and_drain_queue() output_queue.fill_and_drain_queue() command_queue.fill_and_drain_queue() worker.join() - + print("✅ No detections test passed") return True - + except Exception as e: print(f"❌ Test failed with exception: {e}") # Try to cleanup if possible @@ -396,34 +396,34 @@ def test_worker_no_detections(): return False -def test_worker_multiple_detections(): +def test_worker_multiple_detections() -> bool: """Test worker with multiple detections and different strategies.""" print("\n=== Testing Worker with Multiple Detections ===") - + strategies = [ auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER, auto_landing.DetectionSelectionStrategy.LARGEST_AREA, auto_landing.DetectionSelectionStrategy.HIGHEST_CONFIDENCE, ] - + for strategy in strategies: print(f"Testing strategy: {strategy.value}") - + # Setup controller = worker_controller.WorkerController() mp_manager = mp.Manager() - + input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - + # Worker parameters fov_x = 60.0 fov_y = 45.0 im_h = 480 im_w = 640 period = 0.1 - + # Start worker worker = mp.Process( target=auto_landing_worker.auto_landing_worker, @@ -440,26 +440,26 @@ def test_worker_multiple_detections(): controller, ), ) - + worker.start() time.sleep(0.5) - + # Enable auto-landing enable_command = auto_landing_worker.AutoLandingCommand("enable") command_queue.queue.put(enable_command) time.sleep(0.2) - + # Send multiple detections detections = [ - (50, 50, 100, 100, 0.7), # Top-left, lower confidence + (50, 50, 100, 100, 0.7), # Top-left, lower confidence (300, 200, 400, 300, 0.9), # Center-right, high confidence (200, 180, 300, 280, 0.8), # Near center, medium confidence ] mock_merged = create_mock_merged_detections(detections) input_queue.queue.put(mock_merged) - + time.sleep(0.5) - + # Should produce output try: landing_info = output_queue.queue.get_nowait() @@ -468,30 +468,30 @@ def test_worker_multiple_detections(): except queue.Empty: print(f"❌ Strategy {strategy.value} failed to produce output") assert False, f"Strategy {strategy.value} should produce output" - + # Cleanup controller.request_exit() input_queue.fill_and_drain_queue() output_queue.fill_and_drain_queue() command_queue.fill_and_drain_queue() worker.join() - + print("✅ Multiple detections test passed") return True -def test_worker_lifecycle(): +def test_worker_lifecycle() -> bool: """Test worker lifecycle management (pause, resume, exit).""" print("\n=== Testing Worker Lifecycle ===") - + # Setup controller = worker_controller.WorkerController() mp_manager = mp.Manager() - + input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - + # Worker parameters fov_x = 60.0 fov_y = 45.0 @@ -499,7 +499,7 @@ def test_worker_lifecycle(): im_w = 640 period = 0.1 detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER - + # Start worker worker = mp.Process( target=auto_landing_worker.auto_landing_worker, @@ -516,67 +516,67 @@ def test_worker_lifecycle(): controller, ), ) - + print("Starting worker...") worker.start() - + # Give worker time to initialize time.sleep(0.5) - + # Verify worker is alive assert worker.is_alive(), "Worker should be alive after start" print("✅ Worker started successfully") - + # Test pause print("Testing pause...") controller.request_pause() time.sleep(0.2) - + # Worker should still be alive but paused assert worker.is_alive(), "Worker should still be alive when paused" print("✅ Worker paused successfully") - + # Test resume print("Testing resume...") controller.request_resume() time.sleep(0.2) - + # Worker should still be alive and resumed assert worker.is_alive(), "Worker should still be alive after resume" print("✅ Worker resumed successfully") - + # Test exit print("Testing exit...") controller.request_exit() - + # Give worker time to exit gracefully worker.join(timeout=2.0) - + # Worker should have exited assert not worker.is_alive(), "Worker should have exited" print("✅ Worker exited successfully") - + # Cleanup input_queue.fill_and_drain_queue() output_queue.fill_and_drain_queue() command_queue.fill_and_drain_queue() - + print("✅ Worker lifecycle test passed") return True -def test_worker_invalid_commands(): +def test_worker_invalid_commands() -> bool: """Test worker behavior with invalid commands.""" print("\n=== Testing Worker with Invalid Commands ===") - + # Setup controller = worker_controller.WorkerController() mp_manager = mp.Manager() - + input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - + # Worker parameters fov_x = 60.0 fov_y = 45.0 @@ -584,7 +584,7 @@ def test_worker_invalid_commands(): im_w = 640 period = 0.1 detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER - + # Start worker worker = mp.Process( target=auto_landing_worker.auto_landing_worker, @@ -601,66 +601,67 @@ def test_worker_invalid_commands(): controller, ), ) - + print("Starting worker...") worker.start() - + # Give worker time to initialize time.sleep(0.5) - + # Test invalid command print("Testing invalid command...") invalid_command = auto_landing_worker.AutoLandingCommand("invalid_command") command_queue.queue.put(invalid_command) - + time.sleep(0.3) - + # Worker should still be alive after invalid command assert worker.is_alive(), "Worker should still be alive after invalid command" print("✅ Worker handled invalid command gracefully") - + # Test invalid command type print("Testing invalid command type...") command_queue.queue.put("not_a_command_object") - + time.sleep(0.3) - + # Worker should still be alive after invalid command type assert worker.is_alive(), "Worker should still be alive after invalid command type" print("✅ Worker handled invalid command type gracefully") - + # Cleanup controller.request_exit() input_queue.fill_and_drain_queue() output_queue.fill_and_drain_queue() command_queue.fill_and_drain_queue() worker.join() - + print("✅ Invalid commands test passed") return True -def main(): +def main() -> int: """Run all tests.""" print("Starting Auto Landing Worker Integration Tests...") - + # Create logs directory structure required by logger logs_dir = pathlib.Path("logs") logs_dir.mkdir(exist_ok=True) - + # Create timestamp-based subdirectory (logger expects this) import datetime + timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") session_dir = logs_dir / timestamp session_dir.mkdir(exist_ok=True) - + # Clean up any existing log files from previous test runs for log_file in session_dir.glob("*auto_landing_worker*"): try: log_file.unlink() except: pass # Ignore errors if file doesn't exist or is locked - + tests = [ test_basic_worker_functionality, test_worker_commands, @@ -669,10 +670,10 @@ def main(): test_worker_lifecycle, test_worker_invalid_commands, ] - + passed = 0 failed = 0 - + for test in tests: try: if test(): @@ -682,27 +683,27 @@ def main(): except Exception as e: print(f"❌ Test {test.__name__} failed with exception: {e}") failed += 1 - + # Clean up log files after tests for log_file in session_dir.glob("*auto_landing_worker*"): try: log_file.unlink() except: pass # Ignore errors if file doesn't exist or is locked - + # Clean up session directory if empty try: session_dir.rmdir() except: pass # Ignore if directory is not empty or doesn't exist - + print("\n" + "=" * 50) print("TEST RESULTS") print("=" * 50) print(f"Passed: {passed}") print(f"Failed: {failed}") print(f"Total: {passed + failed}") - + if failed == 0: print("🎉 All tests passed!") return 0 From a16addebd5f59f2f36bc8ee400d4895668608a6a Mon Sep 17 00:00:00 2001 From: danielquzhao <94013292+danielquzhao@users.noreply.github.com> Date: Tue, 15 Jul 2025 22:44:01 -0400 Subject: [PATCH 3/9] Pass pylint --- modules/auto_landing/auto_landing.py | 12 +- modules/auto_landing/auto_landing_worker.py | 2 +- tests/integration/test_auto_landing_worker.py | 1434 +++++++++-------- 3 files changed, 724 insertions(+), 724 deletions(-) diff --git a/modules/auto_landing/auto_landing.py b/modules/auto_landing/auto_landing.py index 1ce05c7d..14cddeba 100644 --- a/modules/auto_landing/auto_landing.py +++ b/modules/auto_landing/auto_landing.py @@ -111,7 +111,7 @@ def _select_detection(self, detections: list) -> Optional[int]: if self.__selection_strategy == DetectionSelectionStrategy.FIRST_DETECTION: return 0 - elif self.__selection_strategy == DetectionSelectionStrategy.NEAREST_TO_CENTER: + if self.__selection_strategy == DetectionSelectionStrategy.NEAREST_TO_CENTER: # Find detection closest to image center image_center_x = self.im_w / 2 image_center_y = self.im_h / 2 @@ -269,9 +269,8 @@ def enable(self) -> bool: self.__enabled = True self.__logger.info("Auto-landing system enabled", True) return True - else: - self.__logger.warning("Auto-landing system already enabled", True) - return False + self.__logger.warning("Auto-landing system already enabled", True) + return False def disable(self) -> bool: """ @@ -284,9 +283,8 @@ def disable(self) -> bool: self.__enabled = False self.__logger.info("Auto-landing system disabled", True) return True - else: - self.__logger.warning("Auto-landing system already disabled", True) - return False + self.__logger.warning("Auto-landing system already disabled", True) + return False def process_detections( self, odometry_detections: merged_odometry_detections.MergedOdometryDetections diff --git a/modules/auto_landing/auto_landing_worker.py b/modules/auto_landing/auto_landing_worker.py index f479c630..ddb09fda 100644 --- a/modules/auto_landing/auto_landing_worker.py +++ b/modules/auto_landing/auto_landing_worker.py @@ -129,7 +129,7 @@ def _process_commands( else: local_logger.warning(f"Received invalid command type: {type(command)}", True) - except: + except Exception: # No more commands available break diff --git a/tests/integration/test_auto_landing_worker.py b/tests/integration/test_auto_landing_worker.py index b9bb59bb..897aa231 100644 --- a/tests/integration/test_auto_landing_worker.py +++ b/tests/integration/test_auto_landing_worker.py @@ -1,716 +1,718 @@ -#!/usr/bin/env python3 -""" -Integration test for auto_landing_worker. -Tests the worker process, queue operations, command processing, and lifecycle management. -""" - -import multiprocessing as mp -import time -import queue -import pathlib -import sys -import os -import unittest.mock - -# Add project root to path -sys.path.insert(0, str(pathlib.Path(__file__).parent.parent.parent)) - -from modules.auto_landing import auto_landing_worker -from modules.auto_landing import auto_landing -from modules import merged_odometry_detections -from modules import detections_and_time -from modules.common.modules import orientation -from modules.common.modules import position_local -from modules.common.modules.mavlink import drone_odometry_local -from utilities.workers import queue_proxy_wrapper -from utilities.workers import worker_controller -import numpy as np - - -class MockLogger: - """Mock logger that doesn't write to files.""" - - def __init__(self, name: str) -> None: - self.name = name - - def info(self, message: str, print_to_console: bool = False) -> None: - if print_to_console: - pass # Suppress log output during testing - - def warning(self, message: str, print_to_console: bool = False) -> None: - if print_to_console: - pass # Suppress log output during testing - - def error(self, message: str, print_to_console: bool = False) -> None: - if print_to_console: - pass # Suppress log output during testing - - -def mock_logger_create(name: str, enable_file_logging: bool) -> tuple[bool, MockLogger]: - """Mock logger create function.""" - return True, MockLogger(name) - - -class MockDetection: - """Mock detection for testing.""" - - def __init__(self, x1: float, y1: float, x2: float, y2: float, confidence: float = 0.9) -> None: - self.x_1 = x1 - self.y_1 = y1 - self.x_2 = x2 - self.y_2 = y2 - self.confidence = confidence - - def get_centre(self) -> tuple[float, float]: - """Return center coordinates.""" - center_x = (self.x_1 + self.x_2) / 2 - center_y = (self.y_1 + self.y_2) / 2 - return center_x, center_y - - -def create_mock_merged_detections(detections_list: list[tuple[float, float, float, float, float]], down_position: float = -10.0) -> merged_odometry_detections.MergedOdometryDetections: - """Create mock merged detections for testing.""" - # Create drone position - result, drone_position = position_local.PositionLocal.create(0.0, 0.0, down_position) - if not result: - raise RuntimeError("Failed to create drone position") - assert drone_position is not None - - # Create drone orientation - result, drone_orientation = orientation.Orientation.create(0.0, 0.0, 0.0) - if not result: - raise RuntimeError("Failed to create drone orientation") - assert drone_orientation is not None - - # Create drone odometry - result, drone_odometry = drone_odometry_local.DroneOdometryLocal.create( - drone_position, drone_orientation - ) - if not result: - raise RuntimeError("Failed to create drone odometry") - assert drone_odometry is not None - - # Create detections - mock_detections = [] - for det_data in detections_list: - mock_det = MockDetection(*det_data) - mock_detections.append(mock_det) - - # Create merged detections - result, merged = merged_odometry_detections.MergedOdometryDetections.create( - drone_odometry, mock_detections - ) - if not result: - raise RuntimeError( - f"Failed to create merged detections with {len(mock_detections)} detections" - ) - assert merged is not None - - return merged - - -def test_basic_worker_functionality() -> bool: - """Test basic worker functionality - processing detections.""" - print("=== Testing Basic Worker Functionality ===") - - # Setup - controller = worker_controller.WorkerController() - mp_manager = mp.Manager() - - input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - - # Worker parameters - fov_x = 60.0 - fov_y = 45.0 - im_h = 480 - im_w = 640 - period = 0.1 - detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER - - # Start worker - worker = mp.Process( - target=auto_landing_worker.auto_landing_worker, - args=( - fov_x, - fov_y, - im_h, - im_w, - period, - detection_strategy, - input_queue, - output_queue, - command_queue, - controller, - ), - ) - - print("Starting worker...") - worker.start() - - # Give worker time to initialize - time.sleep(0.5) - - # Enable auto-landing - enable_command = auto_landing_worker.AutoLandingCommand("enable") - command_queue.queue.put(enable_command) - - # Wait for command to be processed - time.sleep(0.2) - - # Send test detection - detections = [(100, 100, 200, 200, 0.9)] # x1, y1, x2, y2, confidence - mock_merged = create_mock_merged_detections(detections) - input_queue.queue.put(mock_merged) - - # Wait for processing - time.sleep(0.5) - - # Check output - try: - landing_info = output_queue.queue.get_nowait() - assert landing_info is not None - assert hasattr(landing_info, "angle_x") - assert hasattr(landing_info, "angle_y") - assert hasattr(landing_info, "target_dist") - print(f"✅ Worker processed detection successfully") - print(f" Angle X: {landing_info.angle_x:.4f}") - print(f" Angle Y: {landing_info.angle_y:.4f}") - print(f" Target Distance: {landing_info.target_dist:.4f}") - except queue.Empty: - print("❌ No output received from worker") - assert False, "Worker should have produced output" - - # Cleanup - controller.request_exit() - input_queue.fill_and_drain_queue() - output_queue.fill_and_drain_queue() - command_queue.fill_and_drain_queue() - worker.join() - - print("✅ Basic worker functionality test passed") - return True - - -def test_worker_commands() -> bool: - """Test worker command processing (enable/disable).""" - print("\n=== Testing Worker Commands ===") - - # Setup - controller = worker_controller.WorkerController() - mp_manager = mp.Manager() - - input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - - # Worker parameters - fov_x = 60.0 - fov_y = 45.0 - im_h = 480 - im_w = 640 - period = 0.1 - detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER - - # Start worker - worker = mp.Process( - target=auto_landing_worker.auto_landing_worker, - args=( - fov_x, - fov_y, - im_h, - im_w, - period, - detection_strategy, - input_queue, - output_queue, - command_queue, - controller, - ), - ) - - print("Starting worker...") - worker.start() - - # Give worker time to initialize - time.sleep(0.5) - - # Test disabled state (default) - print("Testing disabled state...") - detections = [(100, 100, 200, 200, 0.9)] - mock_merged = create_mock_merged_detections(detections) - input_queue.queue.put(mock_merged) - - time.sleep(0.3) - - # Should not produce output when disabled - try: - output_queue.queue.get_nowait() - print("❌ Worker should not process detections when disabled") - assert False, "Worker should not process detections when disabled" - except queue.Empty: - print("✅ Worker correctly ignores detections when disabled") - - # Test enable command - print("Testing enable command...") - enable_command = auto_landing_worker.AutoLandingCommand("enable") - command_queue.queue.put(enable_command) - - time.sleep(0.2) - - # Send detection after enabling - input_queue.queue.put(mock_merged) - - time.sleep(0.3) - - # Should produce output when enabled - try: - landing_info = output_queue.queue.get_nowait() - assert landing_info is not None - print("✅ Worker processes detections when enabled") - except queue.Empty: - print("❌ Worker should process detections when enabled") - assert False, "Worker should process detections when enabled" - - # Test disable command - print("Testing disable command...") - disable_command = auto_landing_worker.AutoLandingCommand("disable") - command_queue.queue.put(disable_command) - - time.sleep(0.2) - - # Send detection after disabling - input_queue.queue.put(mock_merged) - - time.sleep(0.3) - - # Should not produce output when disabled - try: - output_queue.queue.get_nowait() - print("❌ Worker should not process detections after disable") - assert False, "Worker should not process detections after disable" - except queue.Empty: - print("✅ Worker correctly stops processing after disable") - - # Cleanup - controller.request_exit() - input_queue.fill_and_drain_queue() - output_queue.fill_and_drain_queue() - command_queue.fill_and_drain_queue() - worker.join() - - print("✅ Worker commands test passed") - return True - - -def test_worker_no_detections() -> bool: - """Test worker behavior with no detections.""" - print("\n=== Testing Worker with No Detections ===") - - try: - # Setup - controller = worker_controller.WorkerController() - mp_manager = mp.Manager() - - input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - - # Worker parameters - fov_x = 60.0 - fov_y = 45.0 - im_h = 480 - im_w = 640 - period = 0.1 - detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER - - # Start worker - worker = mp.Process( - target=auto_landing_worker.auto_landing_worker, - args=( - fov_x, - fov_y, - im_h, - im_w, - period, - detection_strategy, - input_queue, - output_queue, - command_queue, - controller, - ), - ) - - print("Starting worker...") - worker.start() - - # Give worker time to initialize - time.sleep(0.5) - - # Verify worker is alive - if not worker.is_alive(): - print("❌ Worker failed to start") - return False - - # Enable auto-landing - enable_command = auto_landing_worker.AutoLandingCommand("enable") - command_queue.queue.put(enable_command) - - time.sleep(0.2) - - # Don't send any detections (this simulates the real scenario where - # the data merge worker doesn't produce MergedOdometryDetections when there are no detections) - print("Not sending any detections to worker (simulating no detections scenario)...") - - print("Waiting to ensure worker doesn't produce output...") - time.sleep(0.3) - - # Should not produce output when no data is sent - print("Checking if worker produced output...") - try: - result = output_queue.queue.get_nowait() - print(f"❌ Worker should not produce output when no data is sent, but got: {result}") - return False - except queue.Empty: - print("✅ Worker correctly handles no input data") - - # Cleanup - controller.request_exit() - input_queue.fill_and_drain_queue() - output_queue.fill_and_drain_queue() - command_queue.fill_and_drain_queue() - worker.join() - - print("✅ No detections test passed") - return True - - except Exception as e: - print(f"❌ Test failed with exception: {e}") - # Try to cleanup if possible - try: - controller.request_exit() - worker.join(timeout=1.0) - except: - pass - return False - - -def test_worker_multiple_detections() -> bool: - """Test worker with multiple detections and different strategies.""" - print("\n=== Testing Worker with Multiple Detections ===") - - strategies = [ - auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER, - auto_landing.DetectionSelectionStrategy.LARGEST_AREA, - auto_landing.DetectionSelectionStrategy.HIGHEST_CONFIDENCE, - ] - - for strategy in strategies: - print(f"Testing strategy: {strategy.value}") - - # Setup - controller = worker_controller.WorkerController() - mp_manager = mp.Manager() - - input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - - # Worker parameters - fov_x = 60.0 - fov_y = 45.0 - im_h = 480 - im_w = 640 - period = 0.1 - - # Start worker - worker = mp.Process( - target=auto_landing_worker.auto_landing_worker, - args=( - fov_x, - fov_y, - im_h, - im_w, - period, - strategy, - input_queue, - output_queue, - command_queue, - controller, - ), - ) - - worker.start() - time.sleep(0.5) - - # Enable auto-landing - enable_command = auto_landing_worker.AutoLandingCommand("enable") - command_queue.queue.put(enable_command) - time.sleep(0.2) - - # Send multiple detections - detections = [ - (50, 50, 100, 100, 0.7), # Top-left, lower confidence - (300, 200, 400, 300, 0.9), # Center-right, high confidence - (200, 180, 300, 280, 0.8), # Near center, medium confidence - ] - mock_merged = create_mock_merged_detections(detections) - input_queue.queue.put(mock_merged) - - time.sleep(0.5) - - # Should produce output - try: - landing_info = output_queue.queue.get_nowait() - assert landing_info is not None - print(f"✅ Strategy {strategy.value} produced output") - except queue.Empty: - print(f"❌ Strategy {strategy.value} failed to produce output") - assert False, f"Strategy {strategy.value} should produce output" - - # Cleanup - controller.request_exit() - input_queue.fill_and_drain_queue() - output_queue.fill_and_drain_queue() - command_queue.fill_and_drain_queue() - worker.join() - - print("✅ Multiple detections test passed") - return True - - -def test_worker_lifecycle() -> bool: - """Test worker lifecycle management (pause, resume, exit).""" - print("\n=== Testing Worker Lifecycle ===") - - # Setup - controller = worker_controller.WorkerController() - mp_manager = mp.Manager() - - input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - - # Worker parameters - fov_x = 60.0 - fov_y = 45.0 - im_h = 480 - im_w = 640 - period = 0.1 - detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER - - # Start worker - worker = mp.Process( - target=auto_landing_worker.auto_landing_worker, - args=( - fov_x, - fov_y, - im_h, - im_w, - period, - detection_strategy, - input_queue, - output_queue, - command_queue, - controller, - ), - ) - - print("Starting worker...") - worker.start() - - # Give worker time to initialize - time.sleep(0.5) - - # Verify worker is alive - assert worker.is_alive(), "Worker should be alive after start" - print("✅ Worker started successfully") - - # Test pause - print("Testing pause...") - controller.request_pause() - time.sleep(0.2) - - # Worker should still be alive but paused - assert worker.is_alive(), "Worker should still be alive when paused" - print("✅ Worker paused successfully") - - # Test resume - print("Testing resume...") - controller.request_resume() - time.sleep(0.2) - - # Worker should still be alive and resumed - assert worker.is_alive(), "Worker should still be alive after resume" - print("✅ Worker resumed successfully") - - # Test exit - print("Testing exit...") - controller.request_exit() - - # Give worker time to exit gracefully - worker.join(timeout=2.0) - - # Worker should have exited - assert not worker.is_alive(), "Worker should have exited" - print("✅ Worker exited successfully") - - # Cleanup - input_queue.fill_and_drain_queue() - output_queue.fill_and_drain_queue() - command_queue.fill_and_drain_queue() - - print("✅ Worker lifecycle test passed") - return True - - -def test_worker_invalid_commands() -> bool: - """Test worker behavior with invalid commands.""" - print("\n=== Testing Worker with Invalid Commands ===") - - # Setup - controller = worker_controller.WorkerController() - mp_manager = mp.Manager() - - input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - - # Worker parameters - fov_x = 60.0 - fov_y = 45.0 - im_h = 480 - im_w = 640 - period = 0.1 - detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER - - # Start worker - worker = mp.Process( - target=auto_landing_worker.auto_landing_worker, - args=( - fov_x, - fov_y, - im_h, - im_w, - period, - detection_strategy, - input_queue, - output_queue, - command_queue, - controller, - ), - ) - - print("Starting worker...") - worker.start() - - # Give worker time to initialize - time.sleep(0.5) - - # Test invalid command - print("Testing invalid command...") - invalid_command = auto_landing_worker.AutoLandingCommand("invalid_command") - command_queue.queue.put(invalid_command) - - time.sleep(0.3) - - # Worker should still be alive after invalid command - assert worker.is_alive(), "Worker should still be alive after invalid command" - print("✅ Worker handled invalid command gracefully") - - # Test invalid command type - print("Testing invalid command type...") - command_queue.queue.put("not_a_command_object") - - time.sleep(0.3) - - # Worker should still be alive after invalid command type - assert worker.is_alive(), "Worker should still be alive after invalid command type" - print("✅ Worker handled invalid command type gracefully") - - # Cleanup - controller.request_exit() - input_queue.fill_and_drain_queue() - output_queue.fill_and_drain_queue() - command_queue.fill_and_drain_queue() - worker.join() - - print("✅ Invalid commands test passed") - return True - - -def main() -> int: - """Run all tests.""" - print("Starting Auto Landing Worker Integration Tests...") - - # Create logs directory structure required by logger - logs_dir = pathlib.Path("logs") - logs_dir.mkdir(exist_ok=True) - - # Create timestamp-based subdirectory (logger expects this) - import datetime - - timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") - session_dir = logs_dir / timestamp - session_dir.mkdir(exist_ok=True) - - # Clean up any existing log files from previous test runs - for log_file in session_dir.glob("*auto_landing_worker*"): - try: - log_file.unlink() - except: - pass # Ignore errors if file doesn't exist or is locked - - tests = [ - test_basic_worker_functionality, - test_worker_commands, - test_worker_no_detections, - test_worker_multiple_detections, - test_worker_lifecycle, - test_worker_invalid_commands, - ] - - passed = 0 - failed = 0 - - for test in tests: - try: - if test(): - passed += 1 - else: - failed += 1 - except Exception as e: - print(f"❌ Test {test.__name__} failed with exception: {e}") - failed += 1 - - # Clean up log files after tests - for log_file in session_dir.glob("*auto_landing_worker*"): - try: - log_file.unlink() - except: - pass # Ignore errors if file doesn't exist or is locked - - # Clean up session directory if empty - try: - session_dir.rmdir() - except: - pass # Ignore if directory is not empty or doesn't exist - - print("\n" + "=" * 50) - print("TEST RESULTS") - print("=" * 50) - print(f"Passed: {passed}") - print(f"Failed: {failed}") - print(f"Total: {passed + failed}") - - if failed == 0: - print("🎉 All tests passed!") - return 0 - else: - print("❌ Some tests failed!") - return 1 - - -if __name__ == "__main__": - sys.exit(main()) +# #!/usr/bin/env python3 +# """ +# Integration test for auto_landing_worker. +# Tests the worker process, queue operations, command processing, and lifecycle management. +# """ + +# import multiprocessing as mp +# import time +# import queue +# import pathlib +# import sys +# import os +# import unittest.mock + +# # Add project root to path +# sys.path.insert(0, str(pathlib.Path(__file__).parent.parent.parent)) + +# from modules.auto_landing import auto_landing_worker +# from modules.auto_landing import auto_landing +# from modules import merged_odometry_detections +# from modules import detections_and_time +# from modules.common.modules import orientation +# from modules.common.modules import position_local +# from modules.common.modules.mavlink import drone_odometry_local +# from utilities.workers import queue_proxy_wrapper +# from utilities.workers import worker_controller +# import numpy as np + + +# class MockLogger: +# """Mock logger that doesn't write to files.""" + +# def __init__(self, name: str) -> None: +# self.name = name + +# def info(self, message: str, print_to_console: bool = False) -> None: +# if print_to_console: +# pass # Suppress log output during testing + +# def warning(self, message: str, print_to_console: bool = False) -> None: +# if print_to_console: +# pass # Suppress log output during testing + +# def error(self, message: str, print_to_console: bool = False) -> None: +# if print_to_console: +# pass # Suppress log output during testing + + +# def mock_logger_create(name: str, enable_file_logging: bool) -> tuple[bool, MockLogger]: +# """Mock logger create function.""" +# return True, MockLogger(name) + + +# class MockDetection: +# """Mock detection for testing.""" + +# def __init__(self, x1: float, y1: float, x2: float, y2: float, confidence: float = 0.9) -> None: +# self.x_1 = x1 +# self.y_1 = y1 +# self.x_2 = x2 +# self.y_2 = y2 +# self.confidence = confidence + +# def get_centre(self) -> tuple[float, float]: +# """Return center coordinates.""" +# center_x = (self.x_1 + self.x_2) / 2 +# center_y = (self.y_1 + self.y_2) / 2 +# return center_x, center_y + + +# def create_mock_merged_detections( +# detections_list: list[tuple[float, float, float, float, float]], down_position: float = -10.0 +# ) -> merged_odometry_detections.MergedOdometryDetections: +# """Create mock merged detections for testing.""" +# # Create drone position +# result, drone_position = position_local.PositionLocal.create(0.0, 0.0, down_position) +# if not result: +# raise RuntimeError("Failed to create drone position") +# assert drone_position is not None + +# # Create drone orientation +# result, drone_orientation = orientation.Orientation.create(0.0, 0.0, 0.0) +# if not result: +# raise RuntimeError("Failed to create drone orientation") +# assert drone_orientation is not None + +# # Create drone odometry +# result, drone_odometry = drone_odometry_local.DroneOdometryLocal.create( +# drone_position, drone_orientation +# ) +# if not result: +# raise RuntimeError("Failed to create drone odometry") +# assert drone_odometry is not None + +# # Create detections +# mock_detections = [] +# for det_data in detections_list: +# mock_det = MockDetection(*det_data) +# mock_detections.append(mock_det) + +# # Create merged detections +# result, merged = merged_odometry_detections.MergedOdometryDetections.create( +# drone_odometry, mock_detections +# ) +# if not result: +# raise RuntimeError( +# f"Failed to create merged detections with {len(mock_detections)} detections" +# ) +# assert merged is not None + +# return merged + + +# def test_basic_worker_functionality() -> bool: +# """Test basic worker functionality - processing detections.""" +# print("=== Testing Basic Worker Functionality ===") + +# # Setup +# controller = worker_controller.WorkerController() +# mp_manager = mp.Manager() + +# input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) +# output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) +# command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + +# # Worker parameters +# fov_x = 60.0 +# fov_y = 45.0 +# im_h = 480 +# im_w = 640 +# period = 0.1 +# detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER + +# # Start worker +# worker = mp.Process( +# target=auto_landing_worker.auto_landing_worker, +# args=( +# fov_x, +# fov_y, +# im_h, +# im_w, +# period, +# detection_strategy, +# input_queue, +# output_queue, +# command_queue, +# controller, +# ), +# ) + +# print("Starting worker...") +# worker.start() + +# # Give worker time to initialize +# time.sleep(0.5) + +# # Enable auto-landing +# enable_command = auto_landing_worker.AutoLandingCommand("enable") +# command_queue.queue.put(enable_command) + +# # Wait for command to be processed +# time.sleep(0.2) + +# # Send test detection +# detections = [(100, 100, 200, 200, 0.9)] # x1, y1, x2, y2, confidence +# mock_merged = create_mock_merged_detections(detections) +# input_queue.queue.put(mock_merged) + +# # Wait for processing +# time.sleep(0.5) + +# # Check output +# try: +# landing_info = output_queue.queue.get_nowait() +# assert landing_info is not None +# assert hasattr(landing_info, "angle_x") +# assert hasattr(landing_info, "angle_y") +# assert hasattr(landing_info, "target_dist") +# print(f"✅ Worker processed detection successfully") +# print(f" Angle X: {landing_info.angle_x:.4f}") +# print(f" Angle Y: {landing_info.angle_y:.4f}") +# print(f" Target Distance: {landing_info.target_dist:.4f}") +# except queue.Empty: +# print("❌ No output received from worker") +# assert False, "Worker should have produced output" + +# # Cleanup +# controller.request_exit() +# input_queue.fill_and_drain_queue() +# output_queue.fill_and_drain_queue() +# command_queue.fill_and_drain_queue() +# worker.join() + +# print("✅ Basic worker functionality test passed") +# return True + + +# def test_worker_commands() -> bool: +# """Test worker command processing (enable/disable).""" +# print("\n=== Testing Worker Commands ===") + +# # Setup +# controller = worker_controller.WorkerController() +# mp_manager = mp.Manager() + +# input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) +# output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) +# command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + +# # Worker parameters +# fov_x = 60.0 +# fov_y = 45.0 +# im_h = 480 +# im_w = 640 +# period = 0.1 +# detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER + +# # Start worker +# worker = mp.Process( +# target=auto_landing_worker.auto_landing_worker, +# args=( +# fov_x, +# fov_y, +# im_h, +# im_w, +# period, +# detection_strategy, +# input_queue, +# output_queue, +# command_queue, +# controller, +# ), +# ) + +# print("Starting worker...") +# worker.start() + +# # Give worker time to initialize +# time.sleep(0.5) + +# # Test disabled state (default) +# print("Testing disabled state...") +# detections = [(100, 100, 200, 200, 0.9)] +# mock_merged = create_mock_merged_detections(detections) +# input_queue.queue.put(mock_merged) + +# time.sleep(0.3) + +# # Should not produce output when disabled +# try: +# output_queue.queue.get_nowait() +# print("❌ Worker should not process detections when disabled") +# assert False, "Worker should not process detections when disabled" +# except queue.Empty: +# print("✅ Worker correctly ignores detections when disabled") + +# # Test enable command +# print("Testing enable command...") +# enable_command = auto_landing_worker.AutoLandingCommand("enable") +# command_queue.queue.put(enable_command) + +# time.sleep(0.2) + +# # Send detection after enabling +# input_queue.queue.put(mock_merged) + +# time.sleep(0.3) + +# # Should produce output when enabled +# try: +# landing_info = output_queue.queue.get_nowait() +# assert landing_info is not None +# print("✅ Worker processes detections when enabled") +# except queue.Empty: +# print("❌ Worker should process detections when enabled") +# assert False, "Worker should process detections when enabled" + +# # Test disable command +# print("Testing disable command...") +# disable_command = auto_landing_worker.AutoLandingCommand("disable") +# command_queue.queue.put(disable_command) + +# time.sleep(0.2) + +# # Send detection after disabling +# input_queue.queue.put(mock_merged) + +# time.sleep(0.3) + +# # Should not produce output when disabled +# try: +# output_queue.queue.get_nowait() +# print("❌ Worker should not process detections after disable") +# assert False, "Worker should not process detections after disable" +# except queue.Empty: +# print("✅ Worker correctly stops processing after disable") + +# # Cleanup +# controller.request_exit() +# input_queue.fill_and_drain_queue() +# output_queue.fill_and_drain_queue() +# command_queue.fill_and_drain_queue() +# worker.join() + +# print("✅ Worker commands test passed") +# return True + + +# def test_worker_no_detections() -> bool: +# """Test worker behavior with no detections.""" +# print("\n=== Testing Worker with No Detections ===") + +# try: +# # Setup +# controller = worker_controller.WorkerController() +# mp_manager = mp.Manager() + +# input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) +# output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) +# command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + +# # Worker parameters +# fov_x = 60.0 +# fov_y = 45.0 +# im_h = 480 +# im_w = 640 +# period = 0.1 +# detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER + +# # Start worker +# worker = mp.Process( +# target=auto_landing_worker.auto_landing_worker, +# args=( +# fov_x, +# fov_y, +# im_h, +# im_w, +# period, +# detection_strategy, +# input_queue, +# output_queue, +# command_queue, +# controller, +# ), +# ) + +# print("Starting worker...") +# worker.start() + +# # Give worker time to initialize +# time.sleep(0.5) + +# # Verify worker is alive +# if not worker.is_alive(): +# print("❌ Worker failed to start") +# return False + +# # Enable auto-landing +# enable_command = auto_landing_worker.AutoLandingCommand("enable") +# command_queue.queue.put(enable_command) + +# time.sleep(0.2) + +# # Don't send any detections (this simulates the real scenario where +# # the data merge worker doesn't produce MergedOdometryDetections when there are no detections) +# print("Not sending any detections to worker (simulating no detections scenario)...") + +# print("Waiting to ensure worker doesn't produce output...") +# time.sleep(0.3) + +# # Should not produce output when no data is sent +# print("Checking if worker produced output...") +# try: +# result = output_queue.queue.get_nowait() +# print(f"❌ Worker should not produce output when no data is sent, but got: {result}") +# return False +# except queue.Empty: +# print("✅ Worker correctly handles no input data") + +# # Cleanup +# controller.request_exit() +# input_queue.fill_and_drain_queue() +# output_queue.fill_and_drain_queue() +# command_queue.fill_and_drain_queue() +# worker.join() + +# print("✅ No detections test passed") +# return True + +# except Exception as e: +# print(f"❌ Test failed with exception: {e}") +# # Try to cleanup if possible +# try: +# controller.request_exit() +# worker.join(timeout=1.0) +# except: +# pass +# return False + + +# def test_worker_multiple_detections() -> bool: +# """Test worker with multiple detections and different strategies.""" +# print("\n=== Testing Worker with Multiple Detections ===") + +# strategies = [ +# auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER, +# auto_landing.DetectionSelectionStrategy.LARGEST_AREA, +# auto_landing.DetectionSelectionStrategy.HIGHEST_CONFIDENCE, +# ] + +# for strategy in strategies: +# print(f"Testing strategy: {strategy.value}") + +# # Setup +# controller = worker_controller.WorkerController() +# mp_manager = mp.Manager() + +# input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) +# output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) +# command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + +# # Worker parameters +# fov_x = 60.0 +# fov_y = 45.0 +# im_h = 480 +# im_w = 640 +# period = 0.1 + +# # Start worker +# worker = mp.Process( +# target=auto_landing_worker.auto_landing_worker, +# args=( +# fov_x, +# fov_y, +# im_h, +# im_w, +# period, +# strategy, +# input_queue, +# output_queue, +# command_queue, +# controller, +# ), +# ) + +# worker.start() +# time.sleep(0.5) + +# # Enable auto-landing +# enable_command = auto_landing_worker.AutoLandingCommand("enable") +# command_queue.queue.put(enable_command) +# time.sleep(0.2) + +# # Send multiple detections +# detections = [ +# (50, 50, 100, 100, 0.7), # Top-left, lower confidence +# (300, 200, 400, 300, 0.9), # Center-right, high confidence +# (200, 180, 300, 280, 0.8), # Near center, medium confidence +# ] +# mock_merged = create_mock_merged_detections(detections) +# input_queue.queue.put(mock_merged) + +# time.sleep(0.5) + +# # Should produce output +# try: +# landing_info = output_queue.queue.get_nowait() +# assert landing_info is not None +# print(f"✅ Strategy {strategy.value} produced output") +# except queue.Empty: +# print(f"❌ Strategy {strategy.value} failed to produce output") +# assert False, f"Strategy {strategy.value} should produce output" + +# # Cleanup +# controller.request_exit() +# input_queue.fill_and_drain_queue() +# output_queue.fill_and_drain_queue() +# command_queue.fill_and_drain_queue() +# worker.join() + +# print("✅ Multiple detections test passed") +# return True + + +# def test_worker_lifecycle() -> bool: +# """Test worker lifecycle management (pause, resume, exit).""" +# print("\n=== Testing Worker Lifecycle ===") + +# # Setup +# controller = worker_controller.WorkerController() +# mp_manager = mp.Manager() + +# input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) +# output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) +# command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + +# # Worker parameters +# fov_x = 60.0 +# fov_y = 45.0 +# im_h = 480 +# im_w = 640 +# period = 0.1 +# detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER + +# # Start worker +# worker = mp.Process( +# target=auto_landing_worker.auto_landing_worker, +# args=( +# fov_x, +# fov_y, +# im_h, +# im_w, +# period, +# detection_strategy, +# input_queue, +# output_queue, +# command_queue, +# controller, +# ), +# ) + +# print("Starting worker...") +# worker.start() + +# # Give worker time to initialize +# time.sleep(0.5) + +# # Verify worker is alive +# assert worker.is_alive(), "Worker should be alive after start" +# print("✅ Worker started successfully") + +# # Test pause +# print("Testing pause...") +# controller.request_pause() +# time.sleep(0.2) + +# # Worker should still be alive but paused +# assert worker.is_alive(), "Worker should still be alive when paused" +# print("✅ Worker paused successfully") + +# # Test resume +# print("Testing resume...") +# controller.request_resume() +# time.sleep(0.2) + +# # Worker should still be alive and resumed +# assert worker.is_alive(), "Worker should still be alive after resume" +# print("✅ Worker resumed successfully") + +# # Test exit +# print("Testing exit...") +# controller.request_exit() + +# # Give worker time to exit gracefully +# worker.join(timeout=2.0) + +# # Worker should have exited +# assert not worker.is_alive(), "Worker should have exited" +# print("✅ Worker exited successfully") + +# # Cleanup +# input_queue.fill_and_drain_queue() +# output_queue.fill_and_drain_queue() +# command_queue.fill_and_drain_queue() + +# print("✅ Worker lifecycle test passed") +# return True + + +# def test_worker_invalid_commands() -> bool: +# """Test worker behavior with invalid commands.""" +# print("\n=== Testing Worker with Invalid Commands ===") + +# # Setup +# controller = worker_controller.WorkerController() +# mp_manager = mp.Manager() + +# input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) +# output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) +# command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + +# # Worker parameters +# fov_x = 60.0 +# fov_y = 45.0 +# im_h = 480 +# im_w = 640 +# period = 0.1 +# detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER + +# # Start worker +# worker = mp.Process( +# target=auto_landing_worker.auto_landing_worker, +# args=( +# fov_x, +# fov_y, +# im_h, +# im_w, +# period, +# detection_strategy, +# input_queue, +# output_queue, +# command_queue, +# controller, +# ), +# ) + +# print("Starting worker...") +# worker.start() + +# # Give worker time to initialize +# time.sleep(0.5) + +# # Test invalid command +# print("Testing invalid command...") +# invalid_command = auto_landing_worker.AutoLandingCommand("invalid_command") +# command_queue.queue.put(invalid_command) + +# time.sleep(0.3) + +# # Worker should still be alive after invalid command +# assert worker.is_alive(), "Worker should still be alive after invalid command" +# print("✅ Worker handled invalid command gracefully") + +# # Test invalid command type +# print("Testing invalid command type...") +# command_queue.queue.put("not_a_command_object") + +# time.sleep(0.3) + +# # Worker should still be alive after invalid command type +# assert worker.is_alive(), "Worker should still be alive after invalid command type" +# print("✅ Worker handled invalid command type gracefully") + +# # Cleanup +# controller.request_exit() +# input_queue.fill_and_drain_queue() +# output_queue.fill_and_drain_queue() +# command_queue.fill_and_drain_queue() +# worker.join() + +# print("✅ Invalid commands test passed") +# return True + + +# def main() -> int: +# """Run all tests.""" +# print("Starting Auto Landing Worker Integration Tests...") + +# # Create logs directory structure required by logger +# logs_dir = pathlib.Path("logs") +# logs_dir.mkdir(exist_ok=True) + +# # Create timestamp-based subdirectory (logger expects this) +# import datetime + +# timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") +# session_dir = logs_dir / timestamp +# session_dir.mkdir(exist_ok=True) + +# # Clean up any existing log files from previous test runs +# for log_file in session_dir.glob("*auto_landing_worker*"): +# try: +# log_file.unlink() +# except: +# pass # Ignore errors if file doesn't exist or is locked + +# tests = [ +# test_basic_worker_functionality, +# test_worker_commands, +# test_worker_no_detections, +# test_worker_multiple_detections, +# test_worker_lifecycle, +# test_worker_invalid_commands, +# ] + +# passed = 0 +# failed = 0 + +# for test in tests: +# try: +# if test(): +# passed += 1 +# else: +# failed += 1 +# except Exception as e: +# print(f"❌ Test {test.__name__} failed with exception: {e}") +# failed += 1 + +# # Clean up log files after tests +# for log_file in session_dir.glob("*auto_landing_worker*"): +# try: +# log_file.unlink() +# except: +# pass # Ignore errors if file doesn't exist or is locked + +# # Clean up session directory if empty +# try: +# session_dir.rmdir() +# except: +# pass # Ignore if directory is not empty or doesn't exist + +# print("\n" + "=" * 50) +# print("TEST RESULTS") +# print("=" * 50) +# print(f"Passed: {passed}") +# print(f"Failed: {failed}") +# print(f"Total: {passed + failed}") + +# if failed == 0: +# print("🎉 All tests passed!") +# return 0 +# else: +# print("❌ Some tests failed!") +# return 1 + + +# if __name__ == "__main__": +# sys.exit(main()) From e4a8ff1007c50cbea5d4848e1a255175a174ae7b Mon Sep 17 00:00:00 2001 From: danielquzhao <94013292+danielquzhao@users.noreply.github.com> Date: Tue, 15 Jul 2025 22:49:48 -0400 Subject: [PATCH 4/9] Actually pass pylint --- modules/auto_landing/auto_landing.py | 4 ++-- modules/auto_landing/auto_landing_worker.py | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/modules/auto_landing/auto_landing.py b/modules/auto_landing/auto_landing.py index 14cddeba..cf5ca2b8 100644 --- a/modules/auto_landing/auto_landing.py +++ b/modules/auto_landing/auto_landing.py @@ -130,7 +130,7 @@ def _select_detection(self, detections: list) -> Optional[int]: return selected_index - elif self.__selection_strategy == DetectionSelectionStrategy.LARGEST_AREA: + if self.__selection_strategy == DetectionSelectionStrategy.LARGEST_AREA: # Find detection with largest bounding box area max_area = 0 selected_index = 0 @@ -145,7 +145,7 @@ def _select_detection(self, detections: list) -> Optional[int]: return selected_index - elif self.__selection_strategy == DetectionSelectionStrategy.HIGHEST_CONFIDENCE: + if self.__selection_strategy == DetectionSelectionStrategy.HIGHEST_CONFIDENCE: # Find detection with highest confidence max_confidence = 0 selected_index = 0 diff --git a/modules/auto_landing/auto_landing_worker.py b/modules/auto_landing/auto_landing_worker.py index ddb09fda..59a45f74 100644 --- a/modules/auto_landing/auto_landing_worker.py +++ b/modules/auto_landing/auto_landing_worker.py @@ -4,6 +4,7 @@ import os import pathlib +import queue import time from utilities.workers import queue_proxy_wrapper @@ -94,7 +95,7 @@ def auto_landing_worker( input_data = None try: input_data = input_queue.queue.get_nowait() - except: + except queue.Empty: # No data available, continue pass @@ -129,7 +130,7 @@ def _process_commands( else: local_logger.warning(f"Received invalid command type: {type(command)}", True) - except Exception: + except queue.Empty: # No more commands available break From 39de3aed74b1d89682be60e237ba41e69e62c78a Mon Sep 17 00:00:00 2001 From: danielquzhao <94013292+danielquzhao@users.noreply.github.com> Date: Tue, 15 Jul 2025 23:02:10 -0400 Subject: [PATCH 5/9] Pass pytest --- tests/brightspot_example/generate_expected.py | 2 ++ tests/unit/test_detect_target_brightspot.py | 10 +++++----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/brightspot_example/generate_expected.py b/tests/brightspot_example/generate_expected.py index a8fad523..0467b0f0 100644 --- a/tests/brightspot_example/generate_expected.py +++ b/tests/brightspot_example/generate_expected.py @@ -48,6 +48,8 @@ filter_by_area=True, min_area_pixels=50, max_area_pixels=640, + min_brightness_threshold=0, + min_average_brightness_threshold=0, ) diff --git a/tests/unit/test_detect_target_brightspot.py b/tests/unit/test_detect_target_brightspot.py index 70e932c5..51d5c3fd 100644 --- a/tests/unit/test_detect_target_brightspot.py +++ b/tests/unit/test_detect_target_brightspot.py @@ -38,7 +38,7 @@ # Config is identical to test_detect_target_worker.py # pylint: disable=duplicate-code DETECT_TARGET_BRIGHTSPOT_CONFIG = detect_target_brightspot.DetectTargetBrightspotConfig( - brightspot_percentile_threshold=99.5, + brightspot_percentile_threshold=99.9, filter_by_color=True, blob_color=255, filter_by_circularity=False, @@ -51,10 +51,10 @@ min_convexity=0.01, max_convexity=1, filter_by_area=True, - min_area_pixels=100, - max_area_pixels=2000, - min_brightness_threshold=50, - min_average_brightness_threshold=120, + min_area_pixels=50, + max_area_pixels=640, + min_brightness_threshold=0, + min_average_brightness_threshold=0, ) # pylint: enable=duplicate-code From 2f737fdb1f918363f522697cf25fd28755fe08fa Mon Sep 17 00:00:00 2001 From: danielquzhao <94013292+danielquzhao@users.noreply.github.com> Date: Tue, 15 Jul 2025 23:10:56 -0400 Subject: [PATCH 6/9] Remove test --- tests/integration/test_auto_landing_worker.py | 718 ------------------ 1 file changed, 718 deletions(-) delete mode 100644 tests/integration/test_auto_landing_worker.py diff --git a/tests/integration/test_auto_landing_worker.py b/tests/integration/test_auto_landing_worker.py deleted file mode 100644 index 897aa231..00000000 --- a/tests/integration/test_auto_landing_worker.py +++ /dev/null @@ -1,718 +0,0 @@ -# #!/usr/bin/env python3 -# """ -# Integration test for auto_landing_worker. -# Tests the worker process, queue operations, command processing, and lifecycle management. -# """ - -# import multiprocessing as mp -# import time -# import queue -# import pathlib -# import sys -# import os -# import unittest.mock - -# # Add project root to path -# sys.path.insert(0, str(pathlib.Path(__file__).parent.parent.parent)) - -# from modules.auto_landing import auto_landing_worker -# from modules.auto_landing import auto_landing -# from modules import merged_odometry_detections -# from modules import detections_and_time -# from modules.common.modules import orientation -# from modules.common.modules import position_local -# from modules.common.modules.mavlink import drone_odometry_local -# from utilities.workers import queue_proxy_wrapper -# from utilities.workers import worker_controller -# import numpy as np - - -# class MockLogger: -# """Mock logger that doesn't write to files.""" - -# def __init__(self, name: str) -> None: -# self.name = name - -# def info(self, message: str, print_to_console: bool = False) -> None: -# if print_to_console: -# pass # Suppress log output during testing - -# def warning(self, message: str, print_to_console: bool = False) -> None: -# if print_to_console: -# pass # Suppress log output during testing - -# def error(self, message: str, print_to_console: bool = False) -> None: -# if print_to_console: -# pass # Suppress log output during testing - - -# def mock_logger_create(name: str, enable_file_logging: bool) -> tuple[bool, MockLogger]: -# """Mock logger create function.""" -# return True, MockLogger(name) - - -# class MockDetection: -# """Mock detection for testing.""" - -# def __init__(self, x1: float, y1: float, x2: float, y2: float, confidence: float = 0.9) -> None: -# self.x_1 = x1 -# self.y_1 = y1 -# self.x_2 = x2 -# self.y_2 = y2 -# self.confidence = confidence - -# def get_centre(self) -> tuple[float, float]: -# """Return center coordinates.""" -# center_x = (self.x_1 + self.x_2) / 2 -# center_y = (self.y_1 + self.y_2) / 2 -# return center_x, center_y - - -# def create_mock_merged_detections( -# detections_list: list[tuple[float, float, float, float, float]], down_position: float = -10.0 -# ) -> merged_odometry_detections.MergedOdometryDetections: -# """Create mock merged detections for testing.""" -# # Create drone position -# result, drone_position = position_local.PositionLocal.create(0.0, 0.0, down_position) -# if not result: -# raise RuntimeError("Failed to create drone position") -# assert drone_position is not None - -# # Create drone orientation -# result, drone_orientation = orientation.Orientation.create(0.0, 0.0, 0.0) -# if not result: -# raise RuntimeError("Failed to create drone orientation") -# assert drone_orientation is not None - -# # Create drone odometry -# result, drone_odometry = drone_odometry_local.DroneOdometryLocal.create( -# drone_position, drone_orientation -# ) -# if not result: -# raise RuntimeError("Failed to create drone odometry") -# assert drone_odometry is not None - -# # Create detections -# mock_detections = [] -# for det_data in detections_list: -# mock_det = MockDetection(*det_data) -# mock_detections.append(mock_det) - -# # Create merged detections -# result, merged = merged_odometry_detections.MergedOdometryDetections.create( -# drone_odometry, mock_detections -# ) -# if not result: -# raise RuntimeError( -# f"Failed to create merged detections with {len(mock_detections)} detections" -# ) -# assert merged is not None - -# return merged - - -# def test_basic_worker_functionality() -> bool: -# """Test basic worker functionality - processing detections.""" -# print("=== Testing Basic Worker Functionality ===") - -# # Setup -# controller = worker_controller.WorkerController() -# mp_manager = mp.Manager() - -# input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) -# output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) -# command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - -# # Worker parameters -# fov_x = 60.0 -# fov_y = 45.0 -# im_h = 480 -# im_w = 640 -# period = 0.1 -# detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER - -# # Start worker -# worker = mp.Process( -# target=auto_landing_worker.auto_landing_worker, -# args=( -# fov_x, -# fov_y, -# im_h, -# im_w, -# period, -# detection_strategy, -# input_queue, -# output_queue, -# command_queue, -# controller, -# ), -# ) - -# print("Starting worker...") -# worker.start() - -# # Give worker time to initialize -# time.sleep(0.5) - -# # Enable auto-landing -# enable_command = auto_landing_worker.AutoLandingCommand("enable") -# command_queue.queue.put(enable_command) - -# # Wait for command to be processed -# time.sleep(0.2) - -# # Send test detection -# detections = [(100, 100, 200, 200, 0.9)] # x1, y1, x2, y2, confidence -# mock_merged = create_mock_merged_detections(detections) -# input_queue.queue.put(mock_merged) - -# # Wait for processing -# time.sleep(0.5) - -# # Check output -# try: -# landing_info = output_queue.queue.get_nowait() -# assert landing_info is not None -# assert hasattr(landing_info, "angle_x") -# assert hasattr(landing_info, "angle_y") -# assert hasattr(landing_info, "target_dist") -# print(f"✅ Worker processed detection successfully") -# print(f" Angle X: {landing_info.angle_x:.4f}") -# print(f" Angle Y: {landing_info.angle_y:.4f}") -# print(f" Target Distance: {landing_info.target_dist:.4f}") -# except queue.Empty: -# print("❌ No output received from worker") -# assert False, "Worker should have produced output" - -# # Cleanup -# controller.request_exit() -# input_queue.fill_and_drain_queue() -# output_queue.fill_and_drain_queue() -# command_queue.fill_and_drain_queue() -# worker.join() - -# print("✅ Basic worker functionality test passed") -# return True - - -# def test_worker_commands() -> bool: -# """Test worker command processing (enable/disable).""" -# print("\n=== Testing Worker Commands ===") - -# # Setup -# controller = worker_controller.WorkerController() -# mp_manager = mp.Manager() - -# input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) -# output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) -# command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - -# # Worker parameters -# fov_x = 60.0 -# fov_y = 45.0 -# im_h = 480 -# im_w = 640 -# period = 0.1 -# detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER - -# # Start worker -# worker = mp.Process( -# target=auto_landing_worker.auto_landing_worker, -# args=( -# fov_x, -# fov_y, -# im_h, -# im_w, -# period, -# detection_strategy, -# input_queue, -# output_queue, -# command_queue, -# controller, -# ), -# ) - -# print("Starting worker...") -# worker.start() - -# # Give worker time to initialize -# time.sleep(0.5) - -# # Test disabled state (default) -# print("Testing disabled state...") -# detections = [(100, 100, 200, 200, 0.9)] -# mock_merged = create_mock_merged_detections(detections) -# input_queue.queue.put(mock_merged) - -# time.sleep(0.3) - -# # Should not produce output when disabled -# try: -# output_queue.queue.get_nowait() -# print("❌ Worker should not process detections when disabled") -# assert False, "Worker should not process detections when disabled" -# except queue.Empty: -# print("✅ Worker correctly ignores detections when disabled") - -# # Test enable command -# print("Testing enable command...") -# enable_command = auto_landing_worker.AutoLandingCommand("enable") -# command_queue.queue.put(enable_command) - -# time.sleep(0.2) - -# # Send detection after enabling -# input_queue.queue.put(mock_merged) - -# time.sleep(0.3) - -# # Should produce output when enabled -# try: -# landing_info = output_queue.queue.get_nowait() -# assert landing_info is not None -# print("✅ Worker processes detections when enabled") -# except queue.Empty: -# print("❌ Worker should process detections when enabled") -# assert False, "Worker should process detections when enabled" - -# # Test disable command -# print("Testing disable command...") -# disable_command = auto_landing_worker.AutoLandingCommand("disable") -# command_queue.queue.put(disable_command) - -# time.sleep(0.2) - -# # Send detection after disabling -# input_queue.queue.put(mock_merged) - -# time.sleep(0.3) - -# # Should not produce output when disabled -# try: -# output_queue.queue.get_nowait() -# print("❌ Worker should not process detections after disable") -# assert False, "Worker should not process detections after disable" -# except queue.Empty: -# print("✅ Worker correctly stops processing after disable") - -# # Cleanup -# controller.request_exit() -# input_queue.fill_and_drain_queue() -# output_queue.fill_and_drain_queue() -# command_queue.fill_and_drain_queue() -# worker.join() - -# print("✅ Worker commands test passed") -# return True - - -# def test_worker_no_detections() -> bool: -# """Test worker behavior with no detections.""" -# print("\n=== Testing Worker with No Detections ===") - -# try: -# # Setup -# controller = worker_controller.WorkerController() -# mp_manager = mp.Manager() - -# input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) -# output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) -# command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - -# # Worker parameters -# fov_x = 60.0 -# fov_y = 45.0 -# im_h = 480 -# im_w = 640 -# period = 0.1 -# detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER - -# # Start worker -# worker = mp.Process( -# target=auto_landing_worker.auto_landing_worker, -# args=( -# fov_x, -# fov_y, -# im_h, -# im_w, -# period, -# detection_strategy, -# input_queue, -# output_queue, -# command_queue, -# controller, -# ), -# ) - -# print("Starting worker...") -# worker.start() - -# # Give worker time to initialize -# time.sleep(0.5) - -# # Verify worker is alive -# if not worker.is_alive(): -# print("❌ Worker failed to start") -# return False - -# # Enable auto-landing -# enable_command = auto_landing_worker.AutoLandingCommand("enable") -# command_queue.queue.put(enable_command) - -# time.sleep(0.2) - -# # Don't send any detections (this simulates the real scenario where -# # the data merge worker doesn't produce MergedOdometryDetections when there are no detections) -# print("Not sending any detections to worker (simulating no detections scenario)...") - -# print("Waiting to ensure worker doesn't produce output...") -# time.sleep(0.3) - -# # Should not produce output when no data is sent -# print("Checking if worker produced output...") -# try: -# result = output_queue.queue.get_nowait() -# print(f"❌ Worker should not produce output when no data is sent, but got: {result}") -# return False -# except queue.Empty: -# print("✅ Worker correctly handles no input data") - -# # Cleanup -# controller.request_exit() -# input_queue.fill_and_drain_queue() -# output_queue.fill_and_drain_queue() -# command_queue.fill_and_drain_queue() -# worker.join() - -# print("✅ No detections test passed") -# return True - -# except Exception as e: -# print(f"❌ Test failed with exception: {e}") -# # Try to cleanup if possible -# try: -# controller.request_exit() -# worker.join(timeout=1.0) -# except: -# pass -# return False - - -# def test_worker_multiple_detections() -> bool: -# """Test worker with multiple detections and different strategies.""" -# print("\n=== Testing Worker with Multiple Detections ===") - -# strategies = [ -# auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER, -# auto_landing.DetectionSelectionStrategy.LARGEST_AREA, -# auto_landing.DetectionSelectionStrategy.HIGHEST_CONFIDENCE, -# ] - -# for strategy in strategies: -# print(f"Testing strategy: {strategy.value}") - -# # Setup -# controller = worker_controller.WorkerController() -# mp_manager = mp.Manager() - -# input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) -# output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) -# command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - -# # Worker parameters -# fov_x = 60.0 -# fov_y = 45.0 -# im_h = 480 -# im_w = 640 -# period = 0.1 - -# # Start worker -# worker = mp.Process( -# target=auto_landing_worker.auto_landing_worker, -# args=( -# fov_x, -# fov_y, -# im_h, -# im_w, -# period, -# strategy, -# input_queue, -# output_queue, -# command_queue, -# controller, -# ), -# ) - -# worker.start() -# time.sleep(0.5) - -# # Enable auto-landing -# enable_command = auto_landing_worker.AutoLandingCommand("enable") -# command_queue.queue.put(enable_command) -# time.sleep(0.2) - -# # Send multiple detections -# detections = [ -# (50, 50, 100, 100, 0.7), # Top-left, lower confidence -# (300, 200, 400, 300, 0.9), # Center-right, high confidence -# (200, 180, 300, 280, 0.8), # Near center, medium confidence -# ] -# mock_merged = create_mock_merged_detections(detections) -# input_queue.queue.put(mock_merged) - -# time.sleep(0.5) - -# # Should produce output -# try: -# landing_info = output_queue.queue.get_nowait() -# assert landing_info is not None -# print(f"✅ Strategy {strategy.value} produced output") -# except queue.Empty: -# print(f"❌ Strategy {strategy.value} failed to produce output") -# assert False, f"Strategy {strategy.value} should produce output" - -# # Cleanup -# controller.request_exit() -# input_queue.fill_and_drain_queue() -# output_queue.fill_and_drain_queue() -# command_queue.fill_and_drain_queue() -# worker.join() - -# print("✅ Multiple detections test passed") -# return True - - -# def test_worker_lifecycle() -> bool: -# """Test worker lifecycle management (pause, resume, exit).""" -# print("\n=== Testing Worker Lifecycle ===") - -# # Setup -# controller = worker_controller.WorkerController() -# mp_manager = mp.Manager() - -# input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) -# output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) -# command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - -# # Worker parameters -# fov_x = 60.0 -# fov_y = 45.0 -# im_h = 480 -# im_w = 640 -# period = 0.1 -# detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER - -# # Start worker -# worker = mp.Process( -# target=auto_landing_worker.auto_landing_worker, -# args=( -# fov_x, -# fov_y, -# im_h, -# im_w, -# period, -# detection_strategy, -# input_queue, -# output_queue, -# command_queue, -# controller, -# ), -# ) - -# print("Starting worker...") -# worker.start() - -# # Give worker time to initialize -# time.sleep(0.5) - -# # Verify worker is alive -# assert worker.is_alive(), "Worker should be alive after start" -# print("✅ Worker started successfully") - -# # Test pause -# print("Testing pause...") -# controller.request_pause() -# time.sleep(0.2) - -# # Worker should still be alive but paused -# assert worker.is_alive(), "Worker should still be alive when paused" -# print("✅ Worker paused successfully") - -# # Test resume -# print("Testing resume...") -# controller.request_resume() -# time.sleep(0.2) - -# # Worker should still be alive and resumed -# assert worker.is_alive(), "Worker should still be alive after resume" -# print("✅ Worker resumed successfully") - -# # Test exit -# print("Testing exit...") -# controller.request_exit() - -# # Give worker time to exit gracefully -# worker.join(timeout=2.0) - -# # Worker should have exited -# assert not worker.is_alive(), "Worker should have exited" -# print("✅ Worker exited successfully") - -# # Cleanup -# input_queue.fill_and_drain_queue() -# output_queue.fill_and_drain_queue() -# command_queue.fill_and_drain_queue() - -# print("✅ Worker lifecycle test passed") -# return True - - -# def test_worker_invalid_commands() -> bool: -# """Test worker behavior with invalid commands.""" -# print("\n=== Testing Worker with Invalid Commands ===") - -# # Setup -# controller = worker_controller.WorkerController() -# mp_manager = mp.Manager() - -# input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) -# output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) -# command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) - -# # Worker parameters -# fov_x = 60.0 -# fov_y = 45.0 -# im_h = 480 -# im_w = 640 -# period = 0.1 -# detection_strategy = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER - -# # Start worker -# worker = mp.Process( -# target=auto_landing_worker.auto_landing_worker, -# args=( -# fov_x, -# fov_y, -# im_h, -# im_w, -# period, -# detection_strategy, -# input_queue, -# output_queue, -# command_queue, -# controller, -# ), -# ) - -# print("Starting worker...") -# worker.start() - -# # Give worker time to initialize -# time.sleep(0.5) - -# # Test invalid command -# print("Testing invalid command...") -# invalid_command = auto_landing_worker.AutoLandingCommand("invalid_command") -# command_queue.queue.put(invalid_command) - -# time.sleep(0.3) - -# # Worker should still be alive after invalid command -# assert worker.is_alive(), "Worker should still be alive after invalid command" -# print("✅ Worker handled invalid command gracefully") - -# # Test invalid command type -# print("Testing invalid command type...") -# command_queue.queue.put("not_a_command_object") - -# time.sleep(0.3) - -# # Worker should still be alive after invalid command type -# assert worker.is_alive(), "Worker should still be alive after invalid command type" -# print("✅ Worker handled invalid command type gracefully") - -# # Cleanup -# controller.request_exit() -# input_queue.fill_and_drain_queue() -# output_queue.fill_and_drain_queue() -# command_queue.fill_and_drain_queue() -# worker.join() - -# print("✅ Invalid commands test passed") -# return True - - -# def main() -> int: -# """Run all tests.""" -# print("Starting Auto Landing Worker Integration Tests...") - -# # Create logs directory structure required by logger -# logs_dir = pathlib.Path("logs") -# logs_dir.mkdir(exist_ok=True) - -# # Create timestamp-based subdirectory (logger expects this) -# import datetime - -# timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") -# session_dir = logs_dir / timestamp -# session_dir.mkdir(exist_ok=True) - -# # Clean up any existing log files from previous test runs -# for log_file in session_dir.glob("*auto_landing_worker*"): -# try: -# log_file.unlink() -# except: -# pass # Ignore errors if file doesn't exist or is locked - -# tests = [ -# test_basic_worker_functionality, -# test_worker_commands, -# test_worker_no_detections, -# test_worker_multiple_detections, -# test_worker_lifecycle, -# test_worker_invalid_commands, -# ] - -# passed = 0 -# failed = 0 - -# for test in tests: -# try: -# if test(): -# passed += 1 -# else: -# failed += 1 -# except Exception as e: -# print(f"❌ Test {test.__name__} failed with exception: {e}") -# failed += 1 - -# # Clean up log files after tests -# for log_file in session_dir.glob("*auto_landing_worker*"): -# try: -# log_file.unlink() -# except: -# pass # Ignore errors if file doesn't exist or is locked - -# # Clean up session directory if empty -# try: -# session_dir.rmdir() -# except: -# pass # Ignore if directory is not empty or doesn't exist - -# print("\n" + "=" * 50) -# print("TEST RESULTS") -# print("=" * 50) -# print(f"Passed: {passed}") -# print(f"Failed: {failed}") -# print(f"Total: {passed + failed}") - -# if failed == 0: -# print("🎉 All tests passed!") -# return 0 -# else: -# print("❌ Some tests failed!") -# return 1 - - -# if __name__ == "__main__": -# sys.exit(main()) From 6eb1b9e6ffebb0f988a18c2f41ec24269e46e88c Mon Sep 17 00:00:00 2001 From: danielquzhao <94013292+danielquzhao@users.noreply.github.com> Date: Wed, 16 Jul 2025 00:55:32 -0400 Subject: [PATCH 7/9] Create integration test for auto_landing_worker --- tests/integration/test_auto_landing_worker.py | 248 ++++++++++++++++++ 1 file changed, 248 insertions(+) create mode 100644 tests/integration/test_auto_landing_worker.py diff --git a/tests/integration/test_auto_landing_worker.py b/tests/integration/test_auto_landing_worker.py new file mode 100644 index 00000000..07e9edaf --- /dev/null +++ b/tests/integration/test_auto_landing_worker.py @@ -0,0 +1,248 @@ +""" +Test auto-landing worker process. +""" + +import datetime +import multiprocessing as mp +import pathlib +import time + +import numpy as np + +from modules import detections_and_time +from modules import merged_odometry_detections +from modules.auto_landing import auto_landing +from modules.auto_landing import auto_landing_worker +from modules.common.modules import orientation +from modules.common.modules import position_local +from modules.common.modules.mavlink import drone_odometry_local +from modules.common.modules.logger import logger +from utilities.workers import queue_proxy_wrapper +from utilities.workers import worker_controller + +# Worker parameters +FOV_X = 90.0 # degrees +FOV_Y = 90.0 # degrees +IMAGE_HEIGHT = 640.0 # pixels +IMAGE_WIDTH = 640.0 # pixels +WORKER_PERIOD = 0.1 # seconds +DETECTION_STRATEGY = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER +LOG_TIMINGS = False # Disable timing logging for the test + +# Ensure logs directory exists and create timestamped subdirectory +LOG_DIR = pathlib.Path("logs") +LOG_DIR.mkdir(parents=True, exist_ok=True) + +# Create a timestamped subdirectory for this test session +TIMESTAMP_FORMAT = "%Y-%m-%d_%H-%M-%S" +test_session_dir = LOG_DIR / datetime.datetime.now().strftime(TIMESTAMP_FORMAT) +test_session_dir.mkdir(parents=True, exist_ok=True) + + +def simulate_detection_input( + input_queue: queue_proxy_wrapper.QueueProxyWrapper, + detections: list, + position: tuple, + orientation_angles: tuple, +) -> None: + """ + Create and place merged odometry detections into the input queue. + """ + # Create drone position + result, drone_position = position_local.PositionLocal.create( + position[0], position[1], position[2] + ) + assert result + assert drone_position is not None + + # Create drone orientation + result, drone_orientation = orientation.Orientation.create( + orientation_angles[0], orientation_angles[1], orientation_angles[2] + ) + assert result + assert drone_orientation is not None + + # Create drone odometry + result, drone_odometry = drone_odometry_local.DroneOdometryLocal.create( + drone_position, drone_orientation + ) + assert result + assert drone_odometry is not None + + # Create merged odometry detections + result, merged = merged_odometry_detections.MergedOdometryDetections.create( + drone_odometry, detections + ) + assert result + assert merged is not None + + input_queue.queue.put(merged) + + +def create_test_detection( + bbox: list, label: int, confidence: float +) -> detections_and_time.Detection: + """ + Create a test detection with the given parameters. + """ + result, detection = detections_and_time.Detection.create( + np.array(bbox, dtype=np.float32), label, confidence + ) + assert result + assert detection is not None + return detection + + +def main() -> int: + """ + Main function. + """ + # Logger + test_name = pathlib.Path(__file__).stem + result, local_logger = logger.Logger.create(test_name, LOG_TIMINGS) + assert result # Logger initialization should succeed + assert local_logger is not None + + # Setup + controller = worker_controller.WorkerController() + + mp_manager = mp.Manager() + input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + command_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + + # Create worker process + worker = mp.Process( + target=auto_landing_worker.auto_landing_worker, + args=( + FOV_X, + FOV_Y, + IMAGE_HEIGHT, + IMAGE_WIDTH, + WORKER_PERIOD, + DETECTION_STRATEGY, + input_queue, + output_queue, + command_queue, + controller, + ), + ) + + # Start worker + worker.start() + + # Give worker time to initialize + time.sleep(0.5) + + # Test 1: Worker should not process detections when disabled (default state) + detection1 = create_test_detection([200, 200, 400, 400], 1, 0.9) + simulate_detection_input( + input_queue, + [detection1], + (0.0, 0.0, -50.0), # 50 meters above ground + (0.0, 0.0, 0.0), + ) + + time.sleep(0.2) + + # Should have no output since auto-landing is disabled by default + assert output_queue.queue.empty() + + # Test 2: Enable auto-landing and verify it processes detections + enable_command = auto_landing_worker.AutoLandingCommand("enable") + command_queue.queue.put(enable_command) + + time.sleep(0.2) + + # Now send the same detection - should be processed + detection2 = create_test_detection([300, 300, 500, 500], 2, 0.85) + simulate_detection_input( + input_queue, + [detection2], + (10.0, 5.0, -75.0), # 75 meters above ground + (0.1, 0.0, 0.0), + ) + + time.sleep(0.2) + + # Should have output now + assert not output_queue.queue.empty() + landing_info = output_queue.queue.get_nowait() + assert landing_info is not None + assert hasattr(landing_info, "angle_x") + assert hasattr(landing_info, "angle_y") + assert hasattr(landing_info, "target_dist") + + # Test 3: Test with multiple detections (should use NEAREST_TO_CENTER strategy) + detection3 = create_test_detection([100, 100, 200, 200], 1, 0.7) # Far from center + detection4 = create_test_detection([310, 310, 330, 330], 2, 0.8) # Close to center (320, 320) + + simulate_detection_input( + input_queue, + [detection3, detection4], + (0.0, 0.0, -100.0), # 100 meters above ground + (0.0, 0.0, 0.0), + ) + + time.sleep(0.2) + + # Should have output for the detection closest to center + assert not output_queue.queue.empty() + landing_info2 = output_queue.queue.get_nowait() + assert landing_info2 is not None + + # Test 4: Disable auto-landing and verify it stops processing + disable_command = auto_landing_worker.AutoLandingCommand("disable") + command_queue.queue.put(disable_command) + + time.sleep(0.2) + + # Send another detection - should not be processed + detection5 = create_test_detection([400, 400, 600, 600], 3, 0.95) + simulate_detection_input( + input_queue, + [detection5], + (0.0, 0.0, -60.0), + (0.0, 0.0, 0.0), + ) + + time.sleep(0.2) + + # Should have no new output + assert output_queue.queue.empty() + + # Test 5: Test invalid command handling + invalid_command = auto_landing_worker.AutoLandingCommand("invalid_command") + command_queue.queue.put(invalid_command) + + time.sleep(0.2) + + # Worker should continue running despite invalid command + assert worker.is_alive() + + # Test 6: Test with no detections (empty detection list should not crash) + # This should not create a MergedOdometryDetections object since it requires non-empty detections + # So we just verify the worker continues running + + # Cleanup + controller.request_exit() + + # Drain queues + input_queue.fill_and_drain_queue() + command_queue.fill_and_drain_queue() + + # Wait for worker to finish + worker.join(timeout=5.0) + if worker.is_alive(): + worker.terminate() + worker.join() + + return 0 + + +if __name__ == "__main__": + result_main = main() + if result_main < 0: + print(f"ERROR: Status code: {result_main}") + + print("Done!") From fe2618453beec5c1595ae2cff8a16246121951e5 Mon Sep 17 00:00:00 2001 From: danielquzhao <94013292+danielquzhao@users.noreply.github.com> Date: Wed, 16 Jul 2025 13:00:13 -0400 Subject: [PATCH 8/9] Small edit --- modules/auto_landing/auto_landing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/auto_landing/auto_landing.py b/modules/auto_landing/auto_landing.py index cf5ca2b8..5bfcf3c8 100644 --- a/modules/auto_landing/auto_landing.py +++ b/modules/auto_landing/auto_landing.py @@ -214,7 +214,7 @@ def run( class AutoLandingController: """ - Simple controller for turning auto-landing on/off. + Controller for turning auto-landing on/off. """ __create_key = object() From 06253070372130ee49d18d3197be02c9c6a30467 Mon Sep 17 00:00:00 2001 From: danielquzhao <94013292+danielquzhao@users.noreply.github.com> Date: Wed, 16 Jul 2025 15:26:06 -0400 Subject: [PATCH 9/9] Leave brightspot tests alone --- tests/brightspot_example/generate_expected.py | 2 -- tests/unit/test_detect_target_brightspot.py | 10 +++++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/tests/brightspot_example/generate_expected.py b/tests/brightspot_example/generate_expected.py index 0467b0f0..a8fad523 100644 --- a/tests/brightspot_example/generate_expected.py +++ b/tests/brightspot_example/generate_expected.py @@ -48,8 +48,6 @@ filter_by_area=True, min_area_pixels=50, max_area_pixels=640, - min_brightness_threshold=0, - min_average_brightness_threshold=0, ) diff --git a/tests/unit/test_detect_target_brightspot.py b/tests/unit/test_detect_target_brightspot.py index 51d5c3fd..70e932c5 100644 --- a/tests/unit/test_detect_target_brightspot.py +++ b/tests/unit/test_detect_target_brightspot.py @@ -38,7 +38,7 @@ # Config is identical to test_detect_target_worker.py # pylint: disable=duplicate-code DETECT_TARGET_BRIGHTSPOT_CONFIG = detect_target_brightspot.DetectTargetBrightspotConfig( - brightspot_percentile_threshold=99.9, + brightspot_percentile_threshold=99.5, filter_by_color=True, blob_color=255, filter_by_circularity=False, @@ -51,10 +51,10 @@ min_convexity=0.01, max_convexity=1, filter_by_area=True, - min_area_pixels=50, - max_area_pixels=640, - min_brightness_threshold=0, - min_average_brightness_threshold=0, + min_area_pixels=100, + max_area_pixels=2000, + min_brightness_threshold=50, + min_average_brightness_threshold=120, ) # pylint: enable=duplicate-code