diff --git a/.claude/settings.local.json b/.claude/settings.local.json deleted file mode 100644 index ffa799142..000000000 --- a/.claude/settings.local.json +++ /dev/null @@ -1,40 +0,0 @@ -{ - "permissions": { - "allow": [ - "Bash(find:*)", - "Bash(jj log:*)", - "Bash(jj status:*)", - "Bash(jj show:*)", - "Bash(jj file list:*)", - "Bash(ls:*)", - "Bash(git checkout:*)", - "Bash(git pull:*)", - "Bash(jj branch:*)", - "Bash(jj git fetch:*)", - "Bash(jj help:*)", - "Bash(jj bookmark:*)", - "Bash(jj describe:*)", - "Bash(jj rebase:*)", - "WebFetch(domain:localhost)", - "mcp__playwright__browser_navigate", - "mcp__playwright__browser_type", - "mcp__playwright__browser_click", - "mcp__playwright__browser_take_screenshot", - "mcp__playwright__browser_snapshot", - "mcp__playwright__browser_press_key", - "mcp__playwright__browser_close", - "Bash(pkill:*)", - "Bash(source:*)", - "Bash(python:*)", - "Bash(pip install:*)", - "Bash(timeout 10 python:*)", - "Bash(grep:*)", - "Bash(nox:*)", - "Bash(# Navigate to python dir and set up venv\nPYTHON_DIR=\"\"/Users/mike/dev/amateur_astro/myPiFinder/wt-radec/python\"\"\ncd \"\"$PYTHON_DIR\"\"\npython3.9 -m venv .venv\nsource .venv/bin/activate\npip install -r requirements.txt\npip install -r requirements_dev.txt)", - "Bash(PYTHON_DIR=\"/Users/mike/dev/amateur_astro/myPiFinder/wt-radec/python\")", - "Bash(pytest:*)", - "Bash(sed:*)", - ], - "deny": [] - } -} diff --git a/python/PiFinder/auto_exposure.py b/python/PiFinder/auto_exposure.py index ea6d05afc..56a8d5e55 100644 --- a/python/PiFinder/auto_exposure.py +++ b/python/PiFinder/auto_exposure.py @@ -9,11 +9,9 @@ The controller targets 17 matched stars (acceptable range: 12-22) which provides reliable plate solving while avoiding over-saturation and maintaining good performance. -Rate limiting on downward adjustments reduces CPU usage. """ import logging -import time from abc import ABC, abstractmethod from typing import List, Optional @@ -121,10 +119,10 @@ def __init__( self._repeat_count = 0 # Sweep pattern: exposure values in microseconds - # Uses doubling pattern (2x each step) + # Start at 400ms (reasonable middle), sweep up, then try shorter exposures # Note: This is intentionally NOT using generate_exposure_sweep() because - # it uses a specific doubling pattern - self._exposures = [25000, 50000, 100000, 200000, 400000, 800000, 1000000] + # it uses a specific pattern optimized for recovery + self._exposures = [400000, 800000, 1000000, 200000, 100000, 50000, 25000] self._repeats_per_exposure = 2 # Try each exposure 2 times logger.info( @@ -619,6 +617,183 @@ def reset(self) -> None: logger.debug("HistogramZeroStarHandler reset") +class ExposureSNRController: + """ + SNR-based auto exposure for SQM measurements. + + Targets a minimum background SNR and exposure time instead of star count. + This provides more stable, longer exposures that are better for accurate + SQM measurements compared to the histogram-based approach. + + Strategy: + - Target specific background level above noise floor + - Derive thresholds from camera profile (bit depth, bias offset) + - Slower adjustments for stability + """ + + def __init__( + self, + min_exposure: int = 10000, # 10ms minimum + max_exposure: int = 1000000, # 1.0s maximum + target_background: int = 30, # Target background level in ADU + min_background: int = 15, # Minimum acceptable background + max_background: int = 100, # Maximum before saturating + adjustment_factor: float = 1.3, # Gentle adjustments (30% steps) + ): + """ + Initialize SNR-based auto exposure. + + Args: + min_exposure: Minimum exposure in microseconds (default 10ms) + max_exposure: Maximum exposure in microseconds (default 1000ms) + target_background: Target median background level in ADU + min_background: Minimum acceptable background (increase if below) + max_background: Maximum acceptable background (decrease if above) + adjustment_factor: Multiplicative adjustment step (default 1.3 = 30%) + """ + self.min_exposure = min_exposure + self.max_exposure = max_exposure + self.target_background = target_background + self.min_background = min_background + self.max_background = max_background + self.adjustment_factor = adjustment_factor + + logger.info( + f"AutoExposure SNR: target_bg={target_background}, " + f"range=[{min_background}, {max_background}] ADU, " + f"exp_range=[{min_exposure/1000:.0f}, {max_exposure/1000:.0f}]ms, " + f"adjustment={adjustment_factor}x" + ) + + @classmethod + def from_camera_profile( + cls, + camera_type: str, + min_exposure: int = 10000, + max_exposure: int = 1000000, + adjustment_factor: float = 1.3, + ) -> "ExposureSNRController": + """ + Create controller with thresholds derived from camera profile. + + Calculates min/target/max background based on bit depth and bias offset. + + Args: + camera_type: Camera type (e.g., "imx296_processed", "imx462_processed") + min_exposure: Minimum exposure in microseconds + max_exposure: Maximum exposure in microseconds + adjustment_factor: Multiplicative adjustment step + + Returns: + ExposureSNRController configured for the camera + """ + from PiFinder.sqm.camera_profiles import get_camera_profile + + profile = get_camera_profile(camera_type) + + # Derive thresholds from camera specs + max_adu = (2 ** profile.bit_depth) - 1 + bias = profile.bias_offset + + # min_background: bias + margin (2x bias or bias + 8, whichever larger) + min_background = int(max(bias * 2, bias + 8)) + + # max_background: ~40% of full range (avoid saturation/nonlinearity) + max_background = int(max_adu * 0.4) + + # target_background: just above min (lower = shorter exposure = more linear response) + target_background = min_background + 2 + + logger.info( + f"SNR controller from {camera_type}: " + f"bit_depth={profile.bit_depth}, bias={bias:.0f}, " + f"thresholds=[{min_background}, {target_background}, {max_background}]" + ) + + return cls( + min_exposure=min_exposure, + max_exposure=max_exposure, + target_background=target_background, + min_background=min_background, + max_background=max_background, + adjustment_factor=adjustment_factor, + ) + + def update( + self, + current_exposure: int, + image: Image.Image, + noise_floor: Optional[float] = None, + **kwargs # Ignore other params (matched_stars, etc.) + ) -> Optional[int]: + """ + Update exposure based on background level. + + Args: + current_exposure: Current exposure in microseconds + image: Current image for analysis + noise_floor: Adaptive noise floor from SQM calculator (if available) + **kwargs: Ignored (for compatibility with PID interface) + + Returns: + New exposure in microseconds, or None if no change needed + """ + # Use adaptive noise floor if available, otherwise fall back to static config + # Need margin above noise floor so background_corrected isn't near zero + if noise_floor is not None: + min_bg = noise_floor + 2 + else: + min_bg = self.min_background + + # Analyze image + if image.mode != "L": + image = image.convert("L") + img_array = np.asarray(image, dtype=np.float32) + + # Use 10th percentile as background estimate (dark pixels) + background = float(np.percentile(img_array, 10)) + + logger.debug( + f"SNR AE: bg={background:.1f}, min={min_bg:.1f} ADU, exp={current_exposure/1000:.0f}ms" + ) + + # Determine adjustment + new_exposure = None + + if background < min_bg: + # Too dark - increase exposure + new_exposure = int(current_exposure * self.adjustment_factor) + logger.info( + f"SNR AE: Background too low ({background:.1f} < {min_bg:.1f}), " + f"increasing exposure {current_exposure/1000:.0f}ms → {new_exposure/1000:.0f}ms" + ) + elif background > self.max_background: + # Too bright - decrease exposure + new_exposure = int(current_exposure / self.adjustment_factor) + logger.info( + f"SNR AE: Background too high ({background:.1f} > {self.max_background}), " + f"decreasing exposure {current_exposure/1000:.0f}ms → {new_exposure/1000:.0f}ms" + ) + else: + # Background is in acceptable range + logger.debug(f"SNR AE: OK (bg={background:.1f} ADU)") + return None + + # Clamp to limits + new_exposure = max(self.min_exposure, min(self.max_exposure, new_exposure)) + return new_exposure + + def get_status(self) -> dict: + return { + "mode": "SNR", + "target_background": self.target_background, + "min_background": self.min_background, + "max_background": self.max_background, + "min_exposure": self.min_exposure, + "max_exposure": self.max_exposure, + } + + class ExposurePIDController: """ PID controller for automatic camera exposure adjustment. @@ -631,56 +806,37 @@ class ExposurePIDController: def __init__( self, target_stars: int = 17, - gains_decrease: tuple = ( - 500.0, # Kp: Conservative (was 2000, reduced 75% to prevent crash) - 5.0, # Ki: Minimal (was 10, reduced to prevent drift) - 250.0, # Kd: Proportional (was 750, reduced 67%) - ), # Kp, Ki, Kd for too many stars (conservative descent) - gains_increase: tuple = ( - 4000.0, # Kp: Moderate aggression (was 8000, reduced 50%) - 250.0, # Ki: Moderate (was 500, reduced 50%) - 1500.0, # Kd: Moderate (was 3000, reduced 50%) - ), # Kp, Ki, Kd for too few stars (faster ascent) + gains: tuple = (2000.0, 100.0, 750.0), # Kp, Ki, Kd min_exposure: int = 25000, max_exposure: int = 1000000, deadband: int = 5, - update_interval: float = 0.5, # Minimum seconds between decreasing adjustments zero_star_handler: Optional[ZeroStarHandler] = None, ): - """ - Initialize PID controller with asymmetric gains. - - Uses conservative gains when decreasing exposure (gentle descent), - aggressive gains when increasing (fast recovery). - """ + """Initialize PID controller.""" self.target_stars = target_stars - self.gains_decrease = gains_decrease - self.gains_increase = gains_increase + self.gains = gains self.min_exposure = min_exposure self.max_exposure = max_exposure self.deadband = deadband - self.update_interval = update_interval self._integral = 0.0 self._last_error: Optional[float] = None self._zero_star_count = 0 - self._last_adjustment_time = 0.0 + self._nonzero_star_count = 0 # Hysteresis: consecutive non-zero solves self._zero_star_handler = zero_star_handler or SweepZeroStarHandler( min_exposure=min_exposure, max_exposure=max_exposure ) logger.info( f"AutoExposure PID: target={target_stars}, deadband={deadband}, " - f"update_interval={update_interval}s, " - f"gains_dec={gains_decrease}, gains_inc={gains_increase}, " - f"range=[{min_exposure}, {max_exposure}]µs" + f"gains={gains}, range=[{min_exposure}, {max_exposure}]µs" ) def reset(self) -> None: self._integral = 0.0 self._last_error = None self._zero_star_count = 0 - self._last_adjustment_time = 0.0 + self._nonzero_star_count = 0 self._zero_star_handler.reset() logger.debug("PID controller reset") @@ -706,24 +862,23 @@ def _handle_zero_stars( ) def _update_pid(self, matched_stars: int, current_exposure: int) -> Optional[int]: - """Core PID algorithm with asymmetric gains.""" + """Core PID algorithm.""" error = self.target_stars - matched_stars if abs(error) <= self.deadband: return None - # Rate limiting: only when decreasing (too many stars) - # When increasing (too few stars), respond immediately for faster recovery - if error < 0: # Too many stars, going down - current_time = time.time() - time_since_last = current_time - self._last_adjustment_time - if time_since_last < self.update_interval: - return None # Skip debug log for performance - else: - current_time = time.time() # Only get time when needed + kp, ki, kd = self.gains - # Select gains: conservative when decreasing, aggressive when increasing - kp, ki, kd = self.gains_decrease if error < 0 else self.gains_increase + # Reset integral when error changes sign to prevent accumulated integral + # from crashing exposure when conditions change suddenly + # (e.g., going from too many stars to too few stars) + if self._last_error is not None: + if (error > 0 and self._last_error < 0) or (error < 0 and self._last_error > 0): + logger.debug( + f"PID: Error sign changed ({self._last_error:.0f} → {error:.0f}), resetting integral" + ) + self._integral = 0.0 # PID calculation p_term = kp * error @@ -746,7 +901,6 @@ def _update_pid(self, matched_stars: int, current_exposure: int) -> Optional[int self._integral -= overshoot / ki self._last_error = error - self._last_adjustment_time = current_time return clamped_exposure @@ -796,27 +950,18 @@ def set_target(self, target_stars: int) -> None: self.target_stars = target_stars logger.debug(f"Target stars changed: {old_target} → {target_stars}") - def set_gains( - self, - gains_decrease: Optional[tuple] = None, - gains_increase: Optional[tuple] = None, - ) -> None: - """Update PID gains. Each tuple is (Kp, Ki, Kd).""" - if gains_decrease is not None: - self.gains_decrease = gains_decrease - if gains_increase is not None: - self.gains_increase = gains_increase - logger.debug(f"PID gains: dec={self.gains_decrease}, inc={self.gains_increase}") + def set_gains(self, gains: tuple) -> None: + """Update PID gains. Tuple is (Kp, Ki, Kd).""" + self.gains = gains + logger.debug(f"PID gains: {self.gains}") def get_status(self) -> dict: return { "target_stars": self.target_stars, - "gains_decrease": self.gains_decrease, - "gains_increase": self.gains_increase, + "gains": self.gains, "integral": self._integral, "last_error": self._last_error, "min_exposure": self.min_exposure, "max_exposure": self.max_exposure, "deadband": self.deadband, - "update_interval": self.update_interval, } diff --git a/python/PiFinder/camera_interface.py b/python/PiFinder/camera_interface.py index 009fc6579..9752f55c0 100644 --- a/python/PiFinder/camera_interface.py +++ b/python/PiFinder/camera_interface.py @@ -21,12 +21,14 @@ from PiFinder import state_utils, utils from PiFinder.auto_exposure import ( ExposurePIDController, + ExposureSNRController, SweepZeroStarHandler, ExponentialSweepZeroStarHandler, ResetZeroStarHandler, HistogramZeroStarHandler, generate_exposure_sweep, ) +from PiFinder.sqm.camera_profiles import detect_camera_type logger = logging.getLogger("Camera.Interface") @@ -37,7 +39,9 @@ class CameraInterface: _camera_started = False _save_next_to = None # Filename to save next capture to (None = don't save) _auto_exposure_enabled = False + _auto_exposure_mode = "pid" # "pid" or "snr" _auto_exposure_pid: Optional[ExposurePIDController] = None + _auto_exposure_snr: Optional[ExposureSNRController] = None _last_solve_time: Optional[float] = None def initialize(self) -> None: @@ -114,13 +118,18 @@ def get_image_loop( root_dir, "test_images", "pifinder_debug_02.png" ) - # 60 half-second cycles + # 60 half-second cycles (30 seconds between captures in sleep mode) sleep_delay = 60 + was_sleeping = False while True: sleeping = state_utils.sleep_for_framerate( shared_state, limit_framerate=False ) if sleeping: + # Log transition to sleep mode + if not was_sleeping: + logger.info("Camera entering low-power sleep mode") + was_sleeping = True # Even in sleep mode, we want to take photos every # so often to update positions sleep_delay -= 1 @@ -128,6 +137,10 @@ def get_image_loop( continue else: sleep_delay = 60 + logger.debug("Sleep mode: waking for periodic capture") + elif was_sleeping: + logger.info("Camera exiting low-power sleep mode") + was_sleeping = False imu_start = shared_state.imu() image_start_time = time.time() @@ -213,11 +226,35 @@ def get_image_loop( f"RMSE: {rmse_str}, Current exposure: {self.exposure_time}µs" ) - # Call PID update (now handles zero stars with recovery mode) - # Pass base_image for histogram analysis in zero-star handler - new_exposure = self._auto_exposure_pid.update( - matched_stars, self.exposure_time, base_image - ) + # Call auto-exposure update based on current mode + if self._auto_exposure_mode == "snr": + # SNR mode: use background-based controller (for SQM measurements) + if self._auto_exposure_snr is None: + # Use camera profile to derive thresholds + try: + cam_type = detect_camera_type(self.get_cam_type()) + cam_type = f"{cam_type}_processed" + self._auto_exposure_snr = ( + ExposureSNRController.from_camera_profile(cam_type) + ) + except ValueError as e: + # Unknown camera, use defaults + logger.warning( + f"Camera detection failed: {e}, using default SNR thresholds" + ) + self._auto_exposure_snr = ExposureSNRController() + # Get adaptive noise floor from shared state + adaptive_noise_floor = self.shared_state.noise_floor() + new_exposure = self._auto_exposure_snr.update( + self.exposure_time, base_image, + noise_floor=adaptive_noise_floor + ) + else: + # PID mode: use star-count based controller (default) + # Pass base_image for histogram analysis in zero-star handler + new_exposure = self._auto_exposure_pid.update( + matched_stars, self.exposure_time, base_image + ) if ( new_exposure is not None @@ -332,6 +369,19 @@ def get_image_loop( "Cannot set AE handler: auto-exposure not initialized" ) + if command.startswith("set_ae_mode"): + mode = command.split(":")[1] + if mode in ["pid", "snr"]: + self._auto_exposure_mode = mode + console_queue.put(f"CAM: AE Mode={mode.upper()}") + logger.info( + f"Auto-exposure mode changed to: {mode.upper()}" + ) + else: + logger.warning( + f"Unknown auto-exposure mode: {mode} (valid: pid, snr)" + ) + if command == "exp_up" or command == "exp_dn": # Manual exposure adjustments disable auto-exposure self._auto_exposure_enabled = False @@ -416,11 +466,11 @@ def get_image_loop( # Disable auto-exposure during sweep self._auto_exposure_enabled = False - # Generate 100 exposure values with logarithmic spacing + # Generate 20 exposure values with logarithmic spacing # from 25ms (25000µs) to 1s (1000000µs) min_exp = 25000 # 25ms max_exp = 1000000 # 1s - num_images = 100 + num_images = 20 # Generate logarithmic sweep using shared utility sweep_exposures = generate_exposure_sweep( diff --git a/python/PiFinder/solver.py b/python/PiFinder/solver.py index 842cf683b..4d8b2e21d 100644 --- a/python/PiFinder/solver.py +++ b/python/PiFinder/solver.py @@ -35,40 +35,18 @@ def create_sqm_calculator(shared_state): - """Create a new SQM calculator instance for PROCESSED images with current calibration.""" - # Get camera type from shared state and use "_processed" profile - # since images are already processed 8-bit (not raw) + """Create a new SQM calculator instance with current calibration.""" camera_type = shared_state.camera_type() camera_type_processed = f"{camera_type}_processed" - logger.info( - f"Creating processed SQM calculator for camera: {camera_type_processed}" - ) - - return SQMCalculator( - camera_type=camera_type_processed, - use_adaptive_noise_floor=True, - ) - + logger.info(f"Creating SQM calculator for camera: {camera_type_processed}") -def create_sqm_calculator_raw(shared_state): - """Create a new SQM calculator instance for RAW 16-bit images with current calibration.""" - # Get camera type from shared state (raw profile, e.g., "imx296", "hq") - camera_type_raw = shared_state.camera_type() + return SQMCalculator(camera_type=camera_type_processed) - logger.info(f"Creating raw SQM calculator for camera: {camera_type_raw}") - return SQMCalculator( - camera_type=camera_type_raw, - use_adaptive_noise_floor=True, - ) - - -def update_sqm_dual_pipeline( +def update_sqm( shared_state, sqm_calculator, - sqm_calculator_raw, - camera_command_queue, centroids, solution, image_processed, @@ -80,22 +58,14 @@ def update_sqm_dual_pipeline( annulus_outer_radius=14, ): """ - Calculate SQM for BOTH processed (8-bit) and raw (16-bit) images. - - This function: - 1. Checks if enough time has passed since last update - 2. Calculates SQM from processed 8-bit image - 3. Captures a raw 16-bit frame, loads it, and calculates raw SQM - 4. Updates shared state with both values + Calculate SQM from image. Args: shared_state: SharedStateObj instance - sqm_calculator: SQM calculator for processed images - sqm_calculator_raw: SQM calculator for raw images - camera_command_queue: Queue to send raw capture command + sqm_calculator: SQM calculator instance centroids: List of detected star centroids solution: Tetra3 solve solution with matched stars - image_processed: Processed 8-bit image array + image_processed: Processed image array (numpy) exposure_sec: Exposure time in seconds altitude_deg: Altitude in degrees for extinction correction calculation_interval_seconds: Minimum time between calculations (default: 5.0) @@ -131,8 +101,8 @@ def update_sqm_dual_pipeline( return False try: - # ========== Calculate PROCESSED (8-bit) SQM ========== - sqm_value_processed, _ = sqm_calculator.calculate( + # Calculate SQM from image + sqm_value, details = sqm_calculator.calculate( centroids=centroids, solution=solution, image=image_processed, @@ -143,48 +113,29 @@ def update_sqm_dual_pipeline( annulus_outer_radius=annulus_outer_radius, ) - # ========== Calculate RAW (16-bit) SQM from shared state ========== - sqm_value_raw = None - - try: - # Get raw frame from shared state (already captured by camera) - raw_array = shared_state.cam_raw() - - if raw_array is not None: - raw_array = np.asarray(raw_array, dtype=np.float32) - - # Calculate raw SQM - sqm_value_raw, _ = sqm_calculator_raw.calculate( - centroids=centroids, - solution=solution, - image=raw_array, - exposure_sec=exposure_sec, - altitude_deg=altitude_deg, - aperture_radius=aperture_radius, - annulus_inner_radius=annulus_inner_radius, - annulus_outer_radius=annulus_outer_radius, - ) - - except Exception as e: - logger.warning(f"Failed to calculate raw SQM: {e}") - # Continue with just processed SQM - - # ========== Update shared state with BOTH values ========== - if sqm_value_processed is not None: + # Update noise floor in shared state (for SNR auto-exposure) + noise_floor_details = details.get("noise_floor_details") + if noise_floor_details and "noise_floor_adu" in noise_floor_details: + shared_state.set_noise_floor(noise_floor_details["noise_floor_adu"]) + + # Store SQM details (filter out large per-star arrays) + filtered_details = { + k: v + for k, v in details.items() + if k + not in ("star_centroids", "star_mags", "star_fluxes", "star_local_backgrounds", "star_mzeros") + } + shared_state.set_sqm_details(filtered_details) + + # Update shared state + if sqm_value is not None: new_sqm_state = SQMState( - value=sqm_value_processed, - value_raw=sqm_value_raw, # May be None if raw failed + value=sqm_value, source="Calculated", last_update=datetime.now().isoformat(), ) shared_state.set_sqm(new_sqm_state) - - raw_str = ( - f", raw={sqm_value_raw:.2f}" - if sqm_value_raw is not None - else ", raw=N/A" - ) - logger.info(f"SQM updated: processed={sqm_value_processed:.2f}{raw_str}") + logger.info(f"SQM updated: {sqm_value:.2f} mag/arcsec²") return True except Exception as e: @@ -194,6 +145,12 @@ def update_sqm_dual_pipeline( return False +class CedarConnectionError(Exception): + """Raised when Cedar gRPC connection fails.""" + + pass + + class PFCedarDetectClient(cedar_detect_client.CedarDetectClient): def __init__(self, port=50551): """Set up the client without spawning the server as we @@ -218,6 +175,60 @@ def _get_stub(self): ) return self._stub + def extract_centroids(self, image, sigma, max_size, use_binned, detect_hot_pixels=True): + """Override to raise CedarConnectionError on gRPC failure instead of returning empty list.""" + import numpy as np + from tetra3 import cedar_detect_pb2 + + np_image = np.asarray(image, dtype=np.uint8) + (height, width) = np_image.shape + centroids_result = None + + # Use shared memory path (same machine) + if self._use_shmem: + self._alloc_shmem(size=width * height) + shimg = np.ndarray(np_image.shape, dtype=np_image.dtype, buffer=self._shmem.buf) + shimg[:] = np_image[:] + + im = cedar_detect_pb2.Image(width=width, height=height, shmem_name=self._shmem.name) + req = cedar_detect_pb2.CentroidsRequest( + input_image=im, + sigma=sigma, + max_size=max_size, + return_binned=False, + use_binned_for_star_candidates=use_binned, + detect_hot_pixels=detect_hot_pixels, + ) + try: + centroids_result = self._get_stub().ExtractCentroids(req) + except grpc.RpcError as err: + if err.code() == grpc.StatusCode.INTERNAL: + # Shared memory issue, fall back to non-shmem + self._del_shmem() + self._use_shmem = False + else: + raise CedarConnectionError(f"Cedar gRPC failed: {err.details()}") from err + + if not self._use_shmem: + im = cedar_detect_pb2.Image(width=width, height=height, image_data=np_image.tobytes()) + req = cedar_detect_pb2.CentroidsRequest( + input_image=im, + sigma=sigma, + max_size=max_size, + return_binned=False, + use_binned_for_star_candidates=use_binned, + ) + try: + centroids_result = self._get_stub().ExtractCentroids(req) + except grpc.RpcError as err: + raise CedarConnectionError(f"Cedar gRPC failed: {err.details()}") from err + + tetra_centroids = [] + if centroids_result is not None: + for sc in centroids_result.star_candidates: + tetra_centroids.append((sc.centroid_position.y, sc.centroid_position.x)) + return tetra_centroids + def __del__(self): self._del_shmem() @@ -272,13 +283,13 @@ def solver( centroids = [] log_no_stars_found = True - # Create SQM calculators (processed and raw) - can be reloaded via command queue + # Create SQM calculator - can be reloaded via command queue sqm_calculator = create_sqm_calculator(shared_state) - sqm_calculator_raw = create_sqm_calculator_raw(shared_state) while True: logger.info("Starting Solver Loop") - # Start cedar detect server + # Try to start cedar detect server, fall back to tetra3 centroider if unavailable + cedar_detect = None try: cedar_detect = PFCedarDetectClient() except FileNotFoundError as e: @@ -286,10 +297,8 @@ def solver( "Not using cedar_detect, as corresponding file '%s' could not be found", e.filename, ) - cedar_detect = None except ValueError: logger.exception("Not using cedar_detect") - cedar_detect = None try: while True: @@ -316,12 +325,9 @@ def solver( align_dec = 0 if command[0] == "reload_sqm_calibration": - logger.info( - "Reloading SQM calibration (both processed and raw)..." - ) + logger.info("Reloading SQM calibration...") sqm_calculator = create_sqm_calculator(shared_state) - sqm_calculator_raw = create_sqm_calculator_raw(shared_state) - logger.info("SQM calibration reloaded for both pipelines") + logger.info("SQM calibration reloaded") state_utils.sleep_for_framerate(shared_state) @@ -357,13 +363,18 @@ def solver( ] t0 = precision_timestamp() - if cedar_detect is None: - # Use old tetr3 centroider - centroids = tetra3.get_centroids_from_image(np_image) + if cedar_detect is not None: + # Try Cedar first + try: + centroids = cedar_detect.extract_centroids( + np_image, sigma=8, max_size=10, use_binned=True + ) + except CedarConnectionError as e: + logger.warning(f"Cedar connection failed: {e}, falling back to tetra3") + centroids = tetra3.get_centroids_from_image(np_image) else: - centroids = cedar_detect.extract_centroids( - np_image, sigma=8, max_size=10, use_binned=True - ) + # Cedar not available, use tetra3 + centroids = tetra3.get_centroids_from_image(np_image) t_extract = (precision_timestamp() - t0) * 1000 logger.debug( @@ -408,11 +419,9 @@ def solver( last_image_metadata["exposure_time"] / 1_000_000.0 ) - update_sqm_dual_pipeline( + update_sqm( shared_state=shared_state, sqm_calculator=sqm_calculator, - sqm_calculator_raw=sqm_calculator_raw, - camera_command_queue=camera_command_queue, centroids=centroids, solution=solution, image_processed=np_image, @@ -430,7 +439,7 @@ def solver( solved |= solution - if "T_solve" in solution: + if "T_solve" in solved: total_tetra_time = t_extract + solved["T_solve"] if total_tetra_time > 1000: console_queue.put(f"SLV: Long: {total_tetra_time}") diff --git a/python/PiFinder/sqm/__init__.py b/python/PiFinder/sqm/__init__.py index 245964e59..cf670abd0 100644 --- a/python/PiFinder/sqm/__init__.py +++ b/python/PiFinder/sqm/__init__.py @@ -2,9 +2,11 @@ SQM (Sky Quality Meter) module for calculating sky background brightness. This module provides: -- SQM: Main calculator for sky quality measurements -- NoiseFloorEstimator: Adaptive noise floor estimation -- get_camera_profile: Camera noise profile lookup +- SQM: Main calculator for sky quality measurements using aperture photometry +- NoiseFloorEstimator: Adaptive noise floor estimation with camera calibration +- CameraProfile: Dataclass containing camera hardware and noise characteristics +- get_camera_profile: Lookup camera profile by type (e.g., "imx296", "hq") +- detect_camera_type: Map hardware IDs to profile names """ from .sqm import SQM @@ -14,7 +16,7 @@ __all__ = [ "SQM", "NoiseFloorEstimator", + "CameraProfile", "get_camera_profile", "detect_camera_type", - "CameraProfile", ] diff --git a/python/PiFinder/sqm/camera_profiles.py b/python/PiFinder/sqm/camera_profiles.py index 61e86e9cc..8547284cb 100644 --- a/python/PiFinder/sqm/camera_profiles.py +++ b/python/PiFinder/sqm/camera_profiles.py @@ -10,6 +10,8 @@ from dataclasses import dataclass from typing import Dict, Tuple +import numpy as np + @dataclass class CameraProfile: @@ -80,8 +82,6 @@ def crop_and_rotate(self, raw_array): Returns: Cropped and rotated array """ - import numpy as np - # Apply cropping crop_top, crop_bottom = self.crop_y crop_left, crop_right = self.crop_x @@ -208,7 +208,7 @@ def __repr__(self) -> str: analog_gain=1.0, # Not applicable to processed images digital_gain=1.0, # Already applied during processing bit_depth=8, - bias_offset=8.0, # Conservative - below typical dark pixels (9-12) + bias_offset=6.0, # Measured from bias frames via calibration wizard read_noise_adu=1.5, # Quantization + residual noise in 8-bit dark_current_rate=0.0, # Negligible after processing thermal_coeff=0.0, diff --git a/python/PiFinder/sqm/save_sweep_metadata.py b/python/PiFinder/sqm/save_sweep_metadata.py index f9d53f6fb..518434a97 100644 --- a/python/PiFinder/sqm/save_sweep_metadata.py +++ b/python/PiFinder/sqm/save_sweep_metadata.py @@ -24,6 +24,8 @@ def save_sweep_metadata( dec_deg: Optional[float] = None, altitude_deg: Optional[float] = None, azimuth_deg: Optional[float] = None, + noise_floor_details: Optional[Dict[str, Any]] = None, + camera_type: Optional[str] = None, notes: str = "", ): """ @@ -42,6 +44,10 @@ def save_sweep_metadata( dec_deg: Declination from solver (optional) altitude_deg: Altitude angle above horizon in degrees (optional) azimuth_deg: Azimuth angle in degrees (optional) + noise_floor_details: Output from NoiseFloorEstimator.estimate_noise_floor() (optional) + Contains: noise_floor_adu, dark_pixel_raw, dark_pixel_smoothed, theoretical_floor, + temporal_noise, read_noise, dark_current_contribution, bias_offset, etc. + camera_type: Camera type string (optional) notes: Any additional notes """ metadata: Dict[str, Any] = { @@ -71,6 +77,18 @@ def save_sweep_metadata( if azimuth_deg is not None: metadata["coordinates"]["azimuth_deg"] = azimuth_deg + # Noise floor estimation details (from NoiseFloorEstimator) + if noise_floor_details is not None: + metadata["noise_floor_estimator"] = { + k: v for k, v in noise_floor_details.items() + if k != "request_zero_sec_sample" # Exclude internal flags + } + if camera_type is not None: + metadata["noise_floor_estimator"]["camera_type"] = camera_type + elif camera_type is not None: + # Fallback: just record camera type if no noise floor details + metadata["noise_floor_estimator"] = {"camera_type": camera_type} + if notes: metadata["notes"] = notes diff --git a/python/PiFinder/sqm/sqm.ipynb b/python/PiFinder/sqm/sqm.ipynb index facb0cfa7..490d27c57 100644 --- a/python/PiFinder/sqm/sqm.ipynb +++ b/python/PiFinder/sqm/sqm.ipynb @@ -87,7 +87,6 @@ "logging.getLogger('Solver').setLevel(logging.WARNING)\n", "\n", "# Now try importing\n", - "from breadth_first_combinations import breadth_first_combinations\n", "\n", "\n", "import PiFinder.tetra3.tetra3 as tetra3\n", @@ -208,7 +207,7 @@ "\n", "def show_image(image):\n", " plt.imshow(image, cmap='gray')\n", - " plt.title(f\"Test image\")\n", + " plt.title(\"Test image\")\n", " plt.colorbar()\n", " plt.show() \n", "\n", @@ -1091,7 +1090,7 @@ " \n", " # Check if solve succeeded\n", " if 'matched_centroids' not in solution or len(solution['matched_centroids']) == 0:\n", - " print(f\" ❌ Failed to solve\")\n", + " print(\" ❌ Failed to solve\")\n", " results_summary.append({\n", " 'filename': filename,\n", " 'expected': info['realsqm'],\n", @@ -1129,7 +1128,7 @@ " 'status': 'OK'\n", " })\n", " else:\n", - " print(f\" ❌ Failed to calculate SQM\")\n", + " print(\" ❌ Failed to calculate SQM\")\n", " results_summary.append({\n", " 'filename': filename,\n", " 'expected': info['realsqm'],\n", @@ -1486,7 +1485,7 @@ " print(f\" Stars affected: {len(overlapping_stars)}/{len(star_centroids)} ({100*len(overlapping_stars)/len(star_centroids):.0f}%)\")\n", " print()\n", " else:\n", - " print(f\"✓ No aperture overlaps detected\\n\")\n", + " print(\"✓ No aperture overlaps detected\\n\")\n", " \n", " # Calculate alternative mzero methods - filter for valid stars (flux > 0 and mzero not None)\n", " valid_indices = [i for i in range(len(star_fluxes)) \n", @@ -1769,8 +1768,8 @@ " ax4.scatter(log_fluxes, valid_mzeros, s=80, alpha=0.7, c=colors, edgecolors='black', linewidths=1)\n", " \n", " # Horizontal lines\n", - " ax4.axhline(mzero_mean, color='blue', linestyle='-', linewidth=2, label=f'Mean', alpha=0.6)\n", - " ax4.axhline(mzero_median, color='green', linestyle='--', linewidth=2, label=f'Median', alpha=0.6)\n", + " ax4.axhline(mzero_mean, color='blue', linestyle='-', linewidth=2, label='Mean', alpha=0.6)\n", + " ax4.axhline(mzero_median, color='green', linestyle='--', linewidth=2, label='Median', alpha=0.6)\n", " \n", " # Check for trend with flux\n", " if len(valid_mzeros) >= 3:\n", @@ -1783,7 +1782,7 @@ " \n", " ax4.set_xlabel('log₁₀(Flux [ADU])', fontsize=10)\n", " ax4.set_ylabel('mzero', fontsize=10)\n", - " ax4.set_title(f'mzero vs Flux\\n(Should be flat if aperture correct)', fontsize=10, weight='bold')\n", + " ax4.set_title('mzero vs Flux\\n(Should be flat if aperture correct)', fontsize=10, weight='bold')\n", " ax4.legend(fontsize=8, loc='best')\n", " ax4.grid(True, alpha=0.3)\n", " else:\n", @@ -1798,12 +1797,12 @@ " color='steelblue', alpha=0.7, edgecolor='black')\n", " \n", " # Mark different estimators\n", - " ax5.axvline(mzero_mean, color='blue', linestyle='-', linewidth=2, label=f'Mean')\n", - " ax5.axvline(mzero_median, color='green', linestyle='--', linewidth=2, label=f'Median')\n", + " ax5.axvline(mzero_mean, color='blue', linestyle='-', linewidth=2, label='Mean')\n", + " ax5.axvline(mzero_median, color='green', linestyle='--', linewidth=2, label='Median')\n", " if n_clipped > 0:\n", - " ax5.axvline(mzero_sigclip, color='red', linestyle='-.', linewidth=2, label=f'σ-clip')\n", + " ax5.axvline(mzero_sigclip, color='red', linestyle='-.', linewidth=2, label='σ-clip')\n", " if len(valid_mzeros) >= 3:\n", - " ax5.axvline(mzero_trend, color='purple', linestyle=':', linewidth=2, label=f'Trend')\n", + " ax5.axvline(mzero_trend, color='purple', linestyle=':', linewidth=2, label='Trend')\n", " \n", " ax5.set_xlabel('mzero', fontsize=10)\n", " ax5.set_ylabel('Count', fontsize=10)\n", @@ -1995,7 +1994,7 @@ " \n", " # Print detailed overlap information\n", " if overlaps:\n", - " print(f\"\\nDETAILED OVERLAP INFORMATION:\")\n", + " print(\"\\nDETAILED OVERLAP INFORMATION:\")\n", " print(f\"{'='*100}\")\n", " for overlap in overlaps:\n", " i, j = overlap['star1_idx'], overlap['star2_idx']\n", @@ -2006,7 +2005,7 @@ " \n", " # Print summary\n", " print(f\"\\n✓ Processed {filename}\")\n", - " print(f\"\\n WITHOUT overlap correction:\")\n", + " print(\"\\n WITHOUT overlap correction:\")\n", " print(f\" SQM (mean): {sqm_val:.2f} (error: {sqm_val - info['realsqm']:+.2f})\")\n", " print(f\" SQM (median): {sqm_median:.2f} (error: {sqm_median - info['realsqm']:+.2f})\")\n", " print(f\" SQM (σ-clipped): {sqm_sigclip:.2f} (error: {sqm_sigclip - info['realsqm']:+.2f})\")\n", @@ -2014,7 +2013,7 @@ " print(f\" SQM (trend+σ-clip): {sqm_trend_sigclip:.2f} (error: {sqm_trend_sigclip - info['realsqm']:+.2f})\")\n", " \n", " if sqm_val_corrected is not None:\n", - " print(f\"\\n WITH overlap correction:\")\n", + " print(\"\\n WITH overlap correction:\")\n", " print(f\" SQM (overlap-corr): {sqm_val_corrected:.2f} (error: {sqm_val_corrected - info['realsqm']:+.2f})\")\n", " print(f\" Stars excluded: {details_corrected.get('n_stars_excluded_overlaps', 0)}/{details_corrected.get('n_matched_stars_original', 0)}\")\n", " print(f\" Improvement: {(sqm_val_corrected - sqm_val):+.2f} mag/arcsec²\")\n", @@ -2039,7 +2038,7 @@ " issues.append(f\"⚠️ {len(overlaps)} aperture overlaps detected ({len(overlapping_stars)} stars affected)\")\n", " \n", " if issues:\n", - " print(f\"\\n Notes:\")\n", + " print(\"\\n Notes:\")\n", " for issue in issues:\n", " print(f\" {issue}\")\n", " \n", diff --git a/python/PiFinder/sqm/sqm.py b/python/PiFinder/sqm/sqm.py index a1250ccbf..00d38c9fb 100644 --- a/python/PiFinder/sqm/sqm.py +++ b/python/PiFinder/sqm/sqm.py @@ -1,10 +1,11 @@ -import numpy as np +import json import logging -from typing import Tuple, Dict, Optional, Any -from datetime import datetime -import time -from PiFinder.state import SQM as SQMState -from .noise_floor import NoiseFloorEstimator +from pathlib import Path +from typing import Tuple, Dict, Optional + +import numpy as np + +from .camera_profiles import get_camera_profile logger = logging.getLogger("Solver") @@ -34,36 +35,60 @@ class SQM: def __init__( self, camera_type: str = "imx296", - pedestal_from_background: bool = False, - use_adaptive_noise_floor: bool = True, ): """ Initialize SQM calculator. Args: - camera_type: Camera model (imx296, imx462, imx290, hq) for noise estimation - pedestal_from_background: If True, automatically estimate pedestal from - median of local backgrounds. Default False (manual pedestal only). - use_adaptive_noise_floor: If True, use adaptive noise floor estimation. - If False, fall back to manual pedestal parameter. Default True. + camera_type: Camera model (imx296, imx462, imx290, hq). + Use "_processed" suffix for 8-bit ISP-processed images. """ - super() - self.pedestal_from_background = pedestal_from_background - self.use_adaptive_noise_floor = use_adaptive_noise_floor - - # Initialize noise floor estimator if enabled - self.noise_estimator: Optional[NoiseFloorEstimator] = None - if use_adaptive_noise_floor: - self.noise_estimator = NoiseFloorEstimator( - camera_type=camera_type, - enable_zero_sec_sampling=True, - zero_sec_interval=300, # Every 5 minutes - ) + self.camera_type = camera_type + self.profile = get_camera_profile(camera_type) + self._load_calibration() + logger.info( + f"SQM initialized (camera: {camera_type}, bias_offset: {self.profile.bias_offset})" + ) + + def _load_calibration(self) -> bool: + """ + Load calibration from JSON file if it exists. + + Calibration file is saved by the SQM calibration wizard to: + ~/PiFinder_data/sqm_calibration_{camera_type}.json + + Returns: + True if calibration was loaded, False if using defaults + """ + calibration_file = ( + Path.home() / "PiFinder_data" / f"sqm_calibration_{self.camera_type}.json" + ) + + if not calibration_file.exists(): + logger.debug(f"No calibration file found at {calibration_file}, using defaults") + return False + + try: + with open(calibration_file, "r") as f: + calibration = json.load(f) + + # Update profile with calibrated values + if "bias_offset" in calibration: + self.profile.bias_offset = float(calibration["bias_offset"]) + if "read_noise" in calibration: + self.profile.read_noise_adu = float(calibration["read_noise"]) + if "dark_current_rate" in calibration: + self.profile.dark_current_rate = float(calibration["dark_current_rate"]) + logger.info( - f"SQM initialized with adaptive noise floor estimation (camera: {camera_type})" + f"Loaded calibration from {calibration_file.name}: " + f"bias_offset={self.profile.bias_offset:.1f}" ) - else: - logger.info("SQM initialized with manual pedestal mode") + return True + + except Exception as e: + logger.warning(f"Failed to load calibration: {e}, using defaults") + return False def _calc_field_parameters(self, fov_degrees: float) -> None: """Calculate field of view parameters.""" @@ -72,43 +97,24 @@ def _calc_field_parameters(self, fov_degrees: float) -> None: self.pixels_total = 512**2 self.arcsec_squared_per_pixel = self.field_arcsec_squared / self.pixels_total - def _calculate_background( - self, image: np.ndarray, centroids: np.ndarray, exclusion_radius: int - ) -> float: + def _pickering_airmass(self, altitude_deg: float) -> float: """ - Calculate background from star-free regions using median. + Calculate airmass using Pickering (2002) formula. + + More accurate than simple 1/sin(alt) near the horizon. + Accounts for atmospheric refraction. + + Reference: Pickering, K.A. (2002), "The Southern Limits of the Ancient + Star Catalogs", DIO 12, 1-15. Args: - image: Image array - centroids: All detected centroids (for masking) - exclusion_radius: Radius around each star to exclude (pixels) + altitude_deg: Altitude in degrees (must be > 0) Returns: - Background level in ADU per pixel + Airmass value (1.0 at zenith, increases toward horizon) """ - height, width = image.shape - mask = np.ones((height, width), dtype=bool) - - # Create coordinate grids - y, x = np.ogrid[:height, :width] - - # Mask out regions around all stars - for cx, cy in centroids: - if 0 <= cx < width and 0 <= cy < height: - star_mask = (x - cx) ** 2 + (y - cy) ** 2 <= exclusion_radius**2 - mask &= ~star_mask - - # Calculate median background from unmasked regions - if np.sum(mask) > 100: # Need enough pixels for reliable median - background_per_pixel = np.median(image[mask]) - else: - # Fallback to percentile if too many stars - background_per_pixel = np.percentile(image, 10) - logger.warning( - f"Using 10th percentile for background (only {np.sum(mask)} unmasked pixels)" - ) - - return float(background_per_pixel) + h = altitude_deg + return 1.0 / np.sin(np.radians(h + 244.0 / (165.0 + 47.0 * h**1.1))) def _measure_star_flux_with_local_background( self, @@ -117,25 +123,33 @@ def _measure_star_flux_with_local_background( aperture_radius: int, annulus_inner_radius: int, annulus_outer_radius: int, - ) -> Tuple[list, list]: + saturation_threshold: int = 250, + ) -> Tuple[list, list, int]: """ Measure star flux with local background from annulus around each star. Args: image: Image array - centroids: Star centroids to measure + centroids: Star centroids to measure, shape (N, 2). + Each row is (row_idx, col_idx) = (y, x) to match numpy indexing. + This is the convention used by Tetra3 matched_centroids. aperture_radius: Aperture radius for star flux in pixels annulus_inner_radius: Inner radius of background annulus in pixels annulus_outer_radius: Outer radius of background annulus in pixels + saturation_threshold: Pixel value threshold for saturation detection (default: 250) + Stars with any aperture pixel >= this value are marked saturated Returns: - Tuple of (star_fluxes, local_backgrounds) where: + Tuple of (star_fluxes, local_backgrounds, n_saturated) where: star_fluxes: Background-subtracted star fluxes (total ADU above local background) + Saturated stars have flux set to -1 to be excluded from mzero calculation local_backgrounds: Local background per pixel for each star (ADU/pixel) + n_saturated: Number of stars excluded due to saturation """ height, width = image.shape star_fluxes = [] local_backgrounds = [] + n_saturated = 0 # Pre-compute squared radii aperture_r2 = aperture_radius**2 @@ -177,8 +191,19 @@ def _measure_star_flux_with_local_background( f"Star at ({cx:.0f},{cy:.0f}) has no annulus pixels, using global median" ) + # Check for saturation in aperture + aperture_pixels = image_patch[aperture_mask] + max_aperture_pixel = np.max(aperture_pixels) if len(aperture_pixels) > 0 else 0 + + if max_aperture_pixel >= saturation_threshold: + # Mark saturated star with flux=-1 to be excluded from mzero calculation + star_fluxes.append(-1) + local_backgrounds.append(local_bg_per_pixel) + n_saturated += 1 + continue + # Total flux in aperture (includes background) - total_flux = np.sum(image_patch[aperture_mask]) + total_flux = np.sum(aperture_pixels) # Subtract background contribution aperture_area_pixels = np.sum(aperture_mask) @@ -188,28 +213,33 @@ def _measure_star_flux_with_local_background( star_fluxes.append(star_flux) local_backgrounds.append(local_bg_per_pixel) - return star_fluxes, local_backgrounds + return star_fluxes, local_backgrounds, n_saturated def _calculate_mzero( self, star_fluxes: list, star_mags: list ) -> Tuple[Optional[float], list]: """ - Calculate photometric zero point from calibrated stars. + Calculate photometric zero point from calibrated stars using flux-weighted mean. For point sources: mzero = catalog_mag + 2.5 × log10(total_flux_ADU) This zero point allows converting any ADU measurement to magnitudes: mag = mzero - 2.5 × log10(flux_ADU) + Uses flux-weighted mean: brighter stars have higher SNR so their + mzero estimates are more reliable. + Args: star_fluxes: Background-subtracted star fluxes (ADU) star_mags: Catalog magnitudes for matched stars Returns: - Tuple of (mean_mzero, list_of_individual_mzeros) + Tuple of (weighted_mean_mzero, list_of_individual_mzeros) Note: The mzeros list will contain None for stars with invalid flux """ mzeros: list[Optional[float]] = [] + valid_mzeros = [] + valid_fluxes = [] for flux, mag in zip(star_fluxes, star_mags): if flux <= 0: @@ -222,16 +252,21 @@ def _calculate_mzero( # Calculate zero point: ZP = m + 2.5*log10(F) mzero = mag + 2.5 * np.log10(flux) mzeros.append(mzero) - - # Filter out None values for statistics calculation - valid_mzeros = [mz for mz in mzeros if mz is not None] + valid_mzeros.append(mzero) + valid_fluxes.append(flux) if len(valid_mzeros) == 0: logger.error("No valid stars for mzero calculation") return None, mzeros - # Return mean and the full mzeros list (which may contain None values) - return float(np.mean(valid_mzeros)), mzeros + # Flux-weighted mean: brighter stars contribute more + valid_mzeros_arr = np.array(valid_mzeros) + valid_fluxes_arr = np.array(valid_fluxes) + weighted_mzero = float( + np.average(valid_mzeros_arr, weights=valid_fluxes_arr) + ) + + return weighted_mzero, mzeros def _detect_aperture_overlaps( self, @@ -286,22 +321,20 @@ def _detect_aperture_overlaps( def _atmospheric_extinction(self, altitude_deg: float) -> float: """ - Calculate atmospheric extinction correction to above-atmosphere equivalent. - - Uses simplified airmass model and typical V-band extinction coefficient. + Calculate atmospheric extinction correction. - The atmosphere ALWAYS dims starlight - even at zenith there's 0.28 mag extinction. - This correction accounts for the total atmospheric extinction to estimate what - the sky brightness would be if measured from above the atmosphere. + Uses Pickering (2002) airmass formula for improved accuracy near horizon. + Zenith is the reference point (extinction=0), with additional extinction + added for lower altitudes. Args: altitude_deg: Altitude of field center in degrees Returns: Extinction correction in magnitudes (add to measured SQM) - - At zenith (90°): 0.28 mag (minimum) - - At 45°: ~0.40 mag - - At 30°: 0.56 mag + - At zenith (90°): 0.0 mag (reference point) + - At 45°: ~0.12 mag + - At 30°: ~0.28 mag """ if altitude_deg <= 0: logger.warning( @@ -309,48 +342,52 @@ def _atmospheric_extinction(self, altitude_deg: float) -> float: ) return 0.0 - # Simplified airmass calculation - altitude_rad = np.radians(altitude_deg) - airmass = 1.0 / np.sin(altitude_rad) + # Use Pickering (2002) airmass formula for better accuracy near horizon + airmass = self._pickering_airmass(altitude_deg) - # Typical V-band extinction: 0.28 mag/airmass at sea level - # Total extinction is always present (minimum 0.28 mag at zenith) - extinction_correction = 0.28 * airmass + # V-band extinction coefficient: 0.28 mag/airmass + # Following ASTAP convention: zenith is reference point (extinction=0 at zenith) + # Only the ADDITIONAL extinction below zenith is added: k * (airmass - 1) + extinction_correction = 0.28 * (airmass - 1) return extinction_correction + def _determine_pedestal_source(self) -> str: + """Determine the source of the pedestal value for diagnostics.""" + return "bias_offset" + def calculate( self, centroids: list, solution: dict, image: np.ndarray, exposure_sec: float, - bias_image: Optional[np.ndarray] = None, altitude_deg: float = 90.0, aperture_radius: int = 5, annulus_inner_radius: int = 6, annulus_outer_radius: int = 14, - pedestal: float = 0.0, correct_overlaps: bool = False, + saturation_threshold: int = 250, + include_noise_floor_details: bool = False, ) -> Tuple[Optional[float], Dict]: """ Calculate SQM (Sky Quality Meter) value using local background annuli. Args: centroids: All detected centroids (unused, kept for compatibility) - solution: Tetra3 solution dict with 'FOV', 'matched_centroids', 'matched_stars' + solution: Tetra3 solution dict with 'FOV', 'matched_centroids', 'matched_stars'. + Note: matched_centroids uses (row, col) = (y, x) convention to match + numpy array indexing (image[row, col]). image: Image array (uint8 or float) - exposure_sec: Exposure time in seconds (required for adaptive noise floor) - bias_image: Optional bias/dark frame for pedestal calculation (default: None) + exposure_sec: Exposure time in seconds (required for noise floor estimation) altitude_deg: Altitude of field center for extinction correction (default: 90 = zenith) aperture_radius: Radius for star photometry in pixels (default: 5) annulus_inner_radius: Inner radius of background annulus in pixels (default: 6) annulus_outer_radius: Outer radius of background annulus in pixels (default: 14) - pedestal: Bias/pedestal level to subtract from background (default: 0) - Only used if use_adaptive_noise_floor=False - If bias_image is provided and pedestal=0, pedestal is calculated from bias_image correct_overlaps: If True, exclude stars with overlapping apertures/annuli (default: False) - Excludes CRITICAL and HIGH overlaps to prevent contamination + saturation_threshold: Pixel value threshold for saturation detection (default: 250) + include_noise_floor_details: If True, run NoiseFloorEstimator and include full output + in details dict under "noise_floor_estimator" key (default: False) Returns: Tuple of (sqm_value, details_dict) where: @@ -358,17 +395,12 @@ def calculate( details_dict: Dictionary with intermediate values for diagnostics Example: - # Using local annulus backgrounds (handles uneven backgrounds) sqm_value, details = sqm_calculator.calculate( centroids=all_centroids, solution=tetra3_solution, image=np_image, - bias_image=bias_frame, + exposure_sec=0.5, altitude_deg=45.0, - aperture_radius=5, - annulus_inner_radius=6, - annulus_outer_radius=14, - correct_overlaps=True # Exclude overlapping stars ) if sqm_value: @@ -432,61 +464,41 @@ def calculate( ) return None, {} - # 0. Determine noise floor / pedestal - noise_floor_details: Dict[str, Any] = {} + # Pedestal = bias_offset only (from camera profile) + # - Bias offset: electronic pedestal, systematic DC offset - SUBTRACT + # - Dark current: contributes to noise variance, not a DC offset - do NOT subtract + # - Read noise: random fluctuation around 0 - do NOT subtract + # Validated against SQM-L reference meter: bias_offset only gives ±0.07 mag accuracy + pedestal = self.profile.bias_offset - if self.use_adaptive_noise_floor and self.noise_estimator is not None: - # Use adaptive noise floor estimation - noise_floor, noise_floor_details = ( - self.noise_estimator.estimate_noise_floor( - image=image, - exposure_sec=exposure_sec, - percentile=5.0, - ) - ) - pedestal = noise_floor - - logger.info( - f"Adaptive noise floor: {noise_floor:.1f} ADU " - f"(dark_px={noise_floor_details['dark_pixel_smoothed']:.1f}, " - f"theory={noise_floor_details['theoretical_floor']:.1f}, " - f"valid={noise_floor_details['is_valid']})" - ) + # Calculate temporal noise for diagnostics (not used in pedestal) + read_noise = self.profile.read_noise_adu + dark_current_contribution = self.profile.dark_current_rate * exposure_sec + temporal_noise = read_noise + dark_current_contribution - # Check if zero-sec sample requested - if noise_floor_details.get("request_zero_sec_sample"): - logger.info( - "Zero-second calibration sample requested by noise estimator " - "(will be captured in next cycle)" - ) - else: - # Use manual pedestal (legacy mode) - if bias_image is not None and pedestal == 0.0: - pedestal = float(np.median(bias_image)) - logger.debug(f"Pedestal from bias: {pedestal:.2f} ADU") - elif pedestal > 0: - logger.debug(f"Using manual pedestal: {pedestal:.2f} ADU") - else: - logger.debug("No pedestal applied") + logger.debug( + f"Calibration: pedestal={pedestal:.1f} ADU (bias_offset), " + f"read_noise={read_noise:.2f}, dark_current={dark_current_contribution:.2f} " + f"(rate={self.profile.dark_current_rate:.3f} ADU/s × {exposure_sec:.2f}s), " + f"temporal_noise={temporal_noise:.2f}" + ) # 1. Measure star fluxes with local background from annulus - star_fluxes, local_backgrounds = self._measure_star_flux_with_local_background( - image, - matched_centroids_arr, - aperture_radius, - annulus_inner_radius, - annulus_outer_radius, + star_fluxes, local_backgrounds, n_saturated = ( + self._measure_star_flux_with_local_background( + image, + matched_centroids_arr, + aperture_radius, + annulus_inner_radius, + annulus_outer_radius, + saturation_threshold, + ) ) - # 1a. Estimate pedestal from median local background if enabled and not already set - if ( - self.pedestal_from_background - and pedestal == 0.0 - and len(local_backgrounds) > 0 - ): - pedestal = float(np.median(local_backgrounds)) - logger.debug( - f"Pedestal estimated from median(local_backgrounds): {pedestal:.2f} ADU" + if n_saturated > 0: + logger.info( + f"Excluded {n_saturated}/{len(matched_centroids_arr)} saturated stars " + f"(threshold={saturation_threshold})" ) # 2. Calculate sky background from median of local backgrounds @@ -516,16 +528,23 @@ def calculate( # 5. Convert background to flux density (ADU per arcsec²) background_flux_density = background_corrected / self.arcsec_squared_per_pixel - # 6. Calculate raw SQM + # 6. Calculate SQM (before extinction correction) if background_flux_density <= 0: logger.error(f"Invalid background flux density: {background_flux_density}") return None, {} - sqm_raw = mzero - 2.5 * np.log10(background_flux_density) + sqm_uncorrected = mzero - 2.5 * np.log10(background_flux_density) + + # 7. Apply atmospheric extinction correction (ASTAP convention) + # Following ASTAP: zenith is reference point where extinction = 0 + # Only ADDITIONAL extinction below zenith is added: 0.28 * (airmass - 1) + # This allows comparing measurements at different altitudes + extinction_for_altitude = self._atmospheric_extinction(altitude_deg) # 0.28*(airmass-1) - # 7. Apply atmospheric extinction correction - extinction_correction = self._atmospheric_extinction(altitude_deg) - sqm_final = sqm_raw + extinction_correction + # Main SQM value: no extinction correction (raw measurement) + sqm_final = sqm_uncorrected + # Altitude-corrected value: adds extinction for altitude comparison + sqm_altitude_corrected = sqm_uncorrected + extinction_for_altitude # Filter out None values for statistics in diagnostics valid_mzeros_for_stats = [mz for mz in mzeros if mz is not None] @@ -540,25 +559,16 @@ def calculate( "n_matched_stars_original": n_stars_original, "overlap_correction_enabled": correct_overlaps, "n_stars_excluded_overlaps": n_stars_excluded, + "n_stars_excluded_saturation": n_saturated, + "saturation_threshold": saturation_threshold, "background_per_pixel": background_per_pixel, "background_method": "local_annulus", "pedestal": pedestal, - "pedestal_source": ( - "adaptive_noise_floor" - if self.use_adaptive_noise_floor and self.noise_estimator is not None - else ( - "bias_image" - if bias_image is not None - else ( - "median_local_backgrounds" - if pedestal > 0 - and bias_image is None - and self.pedestal_from_background - else ("manual" if pedestal > 0 else "none") - ) - ) - ), - "noise_floor_details": noise_floor_details if noise_floor_details else None, + "pedestal_source": self._determine_pedestal_source(), + "read_noise_adu": read_noise, + "dark_current_rate": self.profile.dark_current_rate, + "dark_current_contribution": dark_current_contribution, + "temporal_noise": temporal_noise, "exposure_sec": exposure_sec, "background_corrected": background_corrected, "background_flux_density": background_flux_density, @@ -572,10 +582,11 @@ def calculate( float(np.min(valid_mzeros_for_stats)), float(np.max(valid_mzeros_for_stats)), ), - "sqm_raw": sqm_raw, + "sqm_uncorrected": sqm_uncorrected, "altitude_deg": altitude_deg, - "extinction_correction": extinction_correction, + "extinction_for_altitude": extinction_for_altitude, "sqm_final": sqm_final, + "sqm_altitude_corrected": sqm_altitude_corrected, # Per-star details for diagnostics "star_centroids": matched_centroids_arr.tolist(), "star_mags": star_mags, @@ -584,105 +595,32 @@ def calculate( "star_mzeros": mzeros, } + # Optionally include full NoiseFloorEstimator output + if include_noise_floor_details: + try: + from .noise_floor import NoiseFloorEstimator + + estimator = NoiseFloorEstimator( + camera_type=self.camera_type, + enable_zero_sec_sampling=False, + ) + _, nf_details = estimator.estimate_noise_floor( + image=image, + exposure_sec=exposure_sec, + ) + # Remove internal flags, add camera type + nf_details.pop("request_zero_sec_sample", None) + nf_details["camera_type"] = self.camera_type + details["noise_floor_estimator"] = nf_details + except Exception as e: + logger.warning(f"Failed to get noise floor details: {e}") + details["noise_floor_estimator"] = {"error": str(e)} + logger.debug( f"SQM: mzero={mzero:.2f}±{np.std(valid_mzeros_for_stats):.2f}, " f"bg={background_flux_density:.6f} ADU/arcsec², pedestal={pedestal:.2f}, " - f"raw={sqm_raw:.2f}, extinction={extinction_correction:.2f}, final={sqm_final:.2f}" + f"raw={sqm_uncorrected:.2f}, ext_alt={extinction_for_altitude:.2f}, " + f"final={sqm_final:.2f}, alt_corr={sqm_altitude_corrected:.2f}" ) return sqm_final, details - - -def update_sqm_if_needed( - shared_state, - sqm_calculator: SQM, - centroids: list, - solution: dict, - image: np.ndarray, - exposure_sec: float, - altitude_deg: float, - calculation_interval_seconds: float = 5.0, - aperture_radius: int = 5, - annulus_inner_radius: int = 6, - annulus_outer_radius: int = 14, -) -> bool: - """ - Check if SQM needs updating and calculate/store new value if needed. - - This function encapsulates all the logic for time-based SQM updates: - - Checks if enough time has passed since last update - - Calculates new SQM value if needed - - Updates shared state with new SQM object - - Handles all timestamp conversions and error cases - - Args: - shared_state: SharedStateObj instance to read/write SQM state - sqm_calculator: SQM calculator instance - centroids: List of detected star centroids - solution: Tetra3 solve solution with matched stars - image: Raw image array - exposure_sec: Exposure time in seconds (required for adaptive noise floor) - altitude_deg: Altitude in degrees for extinction correction - calculation_interval_seconds: Minimum time between calculations (default: 5.0) - aperture_radius: Aperture radius for photometry (default: 5) - annulus_inner_radius: Inner annulus radius (default: 6) - annulus_outer_radius: Outer annulus radius (default: 14) - - Returns: - bool: True if SQM was calculated and updated, False otherwise - """ - # Get current SQM state from shared state - current_sqm = shared_state.sqm() - current_time = time.time() - - # Check if we should calculate SQM: - # - No previous calculation (last_update is None), OR - # - Enough time has passed since last update - should_calculate = current_sqm.last_update is None - - if current_sqm.last_update is not None: - try: - last_update_time = datetime.fromisoformat( - current_sqm.last_update - ).timestamp() - should_calculate = ( - current_time - last_update_time - ) >= calculation_interval_seconds - except (ValueError, AttributeError): - # If timestamp parsing fails, recalculate - logger.warning("Failed to parse SQM timestamp, recalculating") - should_calculate = True - - if not should_calculate: - return False - - # Calculate new SQM value - try: - sqm_value, _ = sqm_calculator.calculate( - centroids=centroids, - solution=solution, - image=image, - exposure_sec=exposure_sec, - altitude_deg=altitude_deg, - aperture_radius=aperture_radius, - annulus_inner_radius=annulus_inner_radius, - annulus_outer_radius=annulus_outer_radius, - ) - - if sqm_value is not None: - # Create new SQM state object - new_sqm_state = SQMState( - value=sqm_value, - source="Calculated", - last_update=datetime.now().isoformat(), - ) - shared_state.set_sqm(new_sqm_state) - logger.debug(f"SQM: {sqm_value:.2f} mag/arcsec²") - return True - else: - logger.warning("SQM calculation returned None") - return False - - except Exception as e: - logger.error(f"SQM calculation failed: {e}", exc_info=True) - return False diff --git a/python/PiFinder/state.py b/python/PiFinder/state.py index 03a979855..2fc6fd027 100644 --- a/python/PiFinder/state.py +++ b/python/PiFinder/state.py @@ -146,19 +146,13 @@ class SQM: Sky Quality Meter - represents the sky brightness measurement. """ - value: float = ( - 20.15 # mag/arcsec² - default typical dark sky value (processed 8-bit) - ) - value_raw: Optional[float] = ( - None # mag/arcsec² - from raw 16-bit pipeline (more accurate) - ) + value: float = 20.15 # mag/arcsec² - default typical dark sky value source: str = "None" # "None", "Calculated", "Manual", etc. last_update: Optional[str] = None # ISO timestamp of last update def __str__(self): - raw_str = f", raw={self.value_raw:.2f}" if self.value_raw is not None else "" return ( - f"SQM(value={self.value:.2f} mag/arcsec²{raw_str}, " + f"SQM(value={self.value:.2f} mag/arcsec², " f"source={self.source}, " f"last_update={self.last_update or 'Never'})" ) @@ -264,6 +258,8 @@ def __init__(self): self.__imu = None self.__location: Location = Location() self.__sqm: SQM = SQM() + self.__noise_floor: float = 10.0 # Adaptive noise floor in ADU (default fallback) + self.__sqm_details: dict = {} # Full SQM calculation details for calibration self.__datetime = None self.__datetime_time = None self.__screen = None @@ -363,6 +359,22 @@ def set_sqm(self, sqm: SQM): """Update the SQM value""" self.__sqm = sqm + def noise_floor(self) -> float: + """Return the adaptive noise floor in ADU""" + return self.__noise_floor + + def set_noise_floor(self, v: float): + """Update the adaptive noise floor (from SQM calculator)""" + self.__noise_floor = v + + def sqm_details(self) -> dict: + """Return the full SQM calculation details""" + return self.__sqm_details + + def set_sqm_details(self, v: dict): + """Update the SQM calculation details""" + self.__sqm_details = v + def get_sky_brightness(self): """Return just the numeric SQM value for convenience""" return self.__sqm.value diff --git a/python/PiFinder/ui/base.py b/python/PiFinder/ui/base.py index 53a0f0363..e09880ed8 100644 --- a/python/PiFinder/ui/base.py +++ b/python/PiFinder/ui/base.py @@ -25,6 +25,52 @@ def _(a) -> Any: return a +class RotatingInfoDisplay: + """Alternates between constellation and SQM with cross-fade animation.""" + + def __init__(self, shared_state, interval=3.0, fade_speed=0.15): + self.shared_state = shared_state + self.interval = interval + self.fade_speed = fade_speed + self.show_sqm = False + self.last_switch = time.time() + self.progress = 1.0 # 1.0 = stable, <1.0 = transitioning + + def _get_text(self, use_sqm): + if use_sqm: + sqm = self.shared_state.sqm() + return f"{sqm.value:.1f}" if sqm and sqm.value else "---" + else: + sol = self.shared_state.solution() + return sol.get("constellation", "---") if sol else "---" + + def update(self): + """Update state, returns (current_text, previous_text, progress).""" + now = time.time() + if now - self.last_switch >= self.interval: + self.show_sqm = not self.show_sqm + self.last_switch = now + self.progress = 0.0 + if self.progress < 1.0: + self.progress = min(1.0, self.progress + self.fade_speed) + return self._get_text(self.show_sqm), self._get_text(not self.show_sqm), self.progress + + def draw(self, draw, x, y, font, colors, max_brightness=255, inverted=False): + """Draw with cross-fade animation. inverted=True for dark text on light bg.""" + current, previous, progress = self.update() + if progress < 1.0: + fade_out = progress < 0.5 + t = progress * 2 if fade_out else (progress - 0.5) * 2 + if inverted: + brightness = int(max_brightness * t) if fade_out else int(max_brightness * (1 - t)) + else: + brightness = int(max_brightness * (1 - t)) if fade_out else int(max_brightness * t) + text = previous if fade_out else current + draw.text((x, y), text, font=font, fill=colors.get(brightness)) + else: + draw.text((x, y), current, font=font, fill=colors.get(0 if inverted else max_brightness)) + + class UIModule: __title__ = "BASE" __help_name__ = "" @@ -97,6 +143,9 @@ def __init__( # anim timer stuff self.last_update_time = time.time() + # Rotating info: alternates between constellation and SQM value + self._rotating_display = RotatingInfoDisplay(self.shared_state) + def active(self): """ Called when a module becomes active @@ -196,6 +245,18 @@ def message(self, message, timeout: float = 2, size=(5, 44, 123, 84)): self.ui_state.set_message_timeout(timeout + time.time()) + def _draw_titlebar_rotating_info(self, x, y, fg): + """Draw rotating constellation/SQM in title bar (dark text on gray bg).""" + self._rotating_display.draw( + self.draw, x, y, self.fonts.bold.font, self.colors, max_brightness=64, inverted=True + ) + + def draw_rotating_info(self, x=10, y=92, font=None): + """Draw rotating constellation/SQM display with cross-fade.""" + self._rotating_display.draw( + self.draw, x, y, font or self.fonts.bold.font, self.colors, max_brightness=255 + ) + def screen_update(self, title_bar=True, button_hints=True) -> None: """ called to trigger UI updates @@ -267,13 +328,11 @@ def screen_update(self, title_bar=True, button_hints=True) -> None: ) if len(self.title) < 9: - # draw the constellation - constellation = solution["constellation"] - self.draw.text( - (self.display_class.resX * 0.54, 1), - constellation, - font=self.fonts.bold.font, - fill=fg if self._unmoved else self.colors.get(32), + # Draw rotating constellation/SQM wheel (replaces static constellation) + self._draw_titlebar_rotating_info( + x=int(self.display_class.resX * 0.54), + y=1, + fg=fg if self._unmoved else self.colors.get(32), ) else: # no solve yet.... diff --git a/python/PiFinder/ui/callbacks.py b/python/PiFinder/ui/callbacks.py index 8c1840442..6f29dc5cc 100644 --- a/python/PiFinder/ui/callbacks.py +++ b/python/PiFinder/ui/callbacks.py @@ -113,13 +113,13 @@ def capture_exposure_sweep(ui_module: UIModule) -> None: logger.info("Starting exposure sweep capture") # Import the sweep UI module - from PiFinder.ui.exp_sweep import UIExpSweep + from PiFinder.ui.sqm_sweep import UISQMSweep # Push the sweep progress UI onto the stack # It will handle starting the sweep and showing progress sweep_item = { - "class": UIExpSweep, - "label": "exp_sweep_progress", + "class": UISQMSweep, + "label": "sqm_sweep_progress", } ui_module.add_to_stack(sweep_item) diff --git a/python/PiFinder/ui/sqm.py b/python/PiFinder/ui/sqm.py index 2cd0115f4..592e755fd 100644 --- a/python/PiFinder/ui/sqm.py +++ b/python/PiFinder/ui/sqm.py @@ -5,6 +5,7 @@ from PiFinder.ui.ui_utils import TextLayouter from PiFinder.image_util import gamma_correct_med, subtract_background import time +from pathlib import Path from typing import Any, TYPE_CHECKING from PIL import Image, ImageDraw, ImageChops, ImageOps @@ -39,17 +40,14 @@ def __init__(self, *args, **kwargs): # Marking menu definition self.marking_menu = MarkingMenu( left=MarkingMenuOption( - label=_("CALC"), + label=_("CAL"), callback=self._launch_calibration, ), down=MarkingMenuOption( - label=_("DEBUG"), - callback=self._launch_debug_sweep, - ), - right=MarkingMenuOption( label=_("CORRECT"), - callback=self._launch_correction, + callback=self._launch_sqm_sweep, ), + right=MarkingMenuOption(), ) def update(self, force=False): @@ -156,26 +154,38 @@ def update(self, force=False): fill=self.colors.get(64), ) - self.draw.text( - (10, 30), - f"{sqm:.2f}", - font=self.fonts.huge.font, - fill=self.colors.get(192), - ) - # Raw SQM value (if available) in smaller text next to main value - if sqm_state.value_raw is not None: + # Show star count and exposure time (right side) + sqm_details = self.shared_state.sqm_details() + if sqm_details: + n_stars = sqm_details.get("n_matched_stars", 0) self.draw.text( - (95, 50), - f"{sqm_state.value_raw:.2f}", + (60, 20), + f"{n_stars}★", font=self.fonts.base.font, - fill=self.colors.get(128), + fill=self.colors.get(64), ) + + image_metadata = self.shared_state.last_image_metadata() + if image_metadata and "exposure_time" in image_metadata: + exp_ms = image_metadata["exposure_time"] / 1000 # Convert µs to ms + if exp_ms >= 1000: + exp_str = f"{exp_ms/1000:.2f}s" + else: + exp_str = f"{exp_ms:.0f}ms" self.draw.text( - (95, 62), - "raw", - font=self.fonts.small.font, + (95, 20), + exp_str, + font=self.fonts.base.font, fill=self.colors.get(64), ) + + self.draw.text( + (10, 30), + f"{sqm:.2f}", + font=self.fonts.huge.font, + fill=self.colors.get(192), + ) + # Units in small, subtle text self.draw.text( (12, 68), @@ -183,18 +193,42 @@ def update(self, force=False): font=self.fonts.base.font, fill=self.colors.get(64), ) - self.draw.text( - (10, 82), - f"{details['title']}", - font=self.fonts.base.font, - fill=self.colors.get(128), - ) - self.draw.text( - (10, 92), - _("Bortle {bc}").format(bc=details["bortle_class"]), - font=self.fonts.bold.font, - fill=self.colors.get(128), - ) + + # Calibration indicator (right side of units line) + if self._is_calibrated(): + self.draw.text( + (105, 68), + "CAL", + font=self.fonts.base.font, + fill=self.colors.get(128), + ) + else: + self.draw.text( + (98, 68), + "!CAL", + font=self.fonts.base.font, + fill=self.colors.get(64), + ) + + # Show altitude-corrected SQM (scientific value) if available + if sqm_details: + sqm_alt = sqm_details.get("sqm_altitude_corrected") + if sqm_alt: + self.draw.text( + (12, 80), + f"alt: {sqm_alt:.2f}", + font=self.fonts.base.font, + fill=self.colors.get(64), + ) + + # Bortle class + if details: + self.draw.text( + (10, 92), + _("Bortle {bc}").format(bc=details["bortle_class"]), + font=self.fonts.base.font, + fill=self.colors.get(128), + ) # Legend details_text = _("DETAILS") @@ -223,11 +257,30 @@ def key_minus(self): if self.show_description: self.text_layout.previous() + def _is_calibrated(self) -> bool: + """Check if SQM calibration file exists for current camera.""" + camera_type = self.shared_state.camera_type() + camera_type_processed = f"{camera_type}_processed" + calibration_file = ( + Path.home() / "PiFinder_data" / f"sqm_calibration_{camera_type_processed}.json" + ) + return calibration_file.exists() + def active(self): """ Called when a module becomes active i.e. foreground controlling display """ + # Switch to SNR auto-exposure mode for stable longer exposures + self.command_queues["camera"].put("set_ae_mode:snr") + + def inactive(self): + """ + Called when a module becomes inactive + i.e. leaving the SQM screen + """ + # Switch back to PID auto-exposure mode + self.command_queues["camera"].put("set_ae_mode:pid") def _launch_calibration(self, marking_menu, selected_item): """Launch the SQM calibration wizard""" @@ -241,30 +294,18 @@ def _launch_calibration(self, marking_menu, selected_item): self.add_to_stack(calibration_def) return True # Exit marking menu - def _launch_debug_sweep(self, marking_menu, selected_item): - """Launch the exposure sweep debug tool""" - from PiFinder.ui.exp_sweep import UIExpSweep + def _launch_sqm_sweep(self, marking_menu, selected_item): + """Launch the SQM correction sweep tool""" + from PiFinder.ui.sqm_sweep import UISQMSweep sweep_def = { - "name": _("Exposure Sweep"), - "class": UIExpSweep, - "label": "exp_sweep", + "name": _("SQM Sweep"), + "class": UISQMSweep, + "label": "sqm_sweep", } self.add_to_stack(sweep_def) return True # Exit marking menu - def _launch_correction(self, marking_menu, selected_item): - """Launch the SQM correction UI""" - from PiFinder.ui.sqm_correction import UISQMCorrection - - correction_def = { - "name": _("SQM Correction"), - "class": UISQMCorrection, - "label": "sqm_correction", - } - self.add_to_stack(correction_def) - return True # Exit marking menu - def get_sky_details(self, mag_arcsec): """ Takes a mag/arcsec² value and returns corresponding Bortle scale details. diff --git a/python/PiFinder/ui/sqm_calibration.py b/python/PiFinder/ui/sqm_calibration.py index 5966cc311..450b54d98 100644 --- a/python/PiFinder/ui/sqm_calibration.py +++ b/python/PiFinder/ui/sqm_calibration.py @@ -268,10 +268,16 @@ def _draw_cap_off_instruction(self): fill=self.colors.get(192), ) - # Legend + # Legend - show skip option for indoor calibration self.draw.text( - (10, 110), - f"{self._SQUARE_} READY 0 CANCEL", + (10, 100), + f"{self._SQUARE_} READY", + font=self.fonts.base.font, + fill=self.colors.get(192), + ) + self.draw.text( + (10, 112), + "0 SKIP (indoor cal)", font=self.fonts.base.font, fill=self.colors.get(192), ) @@ -351,7 +357,7 @@ def _draw_progress(self, label: str, current: int, total: int): else: self.draw.text( (10, 90), - "Hold steady...", + "Keep cap on...", font=self.fonts.base.font, fill=self.colors.get(64), ) @@ -692,8 +698,11 @@ def _analyze_calibration(self): bias_stack = np.array(self.bias_frames, dtype=np.float32) self.bias_offset = float(np.median(bias_stack)) - # 2. Compute read noise (std of all pixels in all bias frames) - self.read_noise = float(np.std(bias_stack)) + # 2. Compute read noise using temporal variance (not spatial) + # Spatial std includes fixed pattern noise (PRNU), which is wrong. + # Temporal variance at each pixel measures true read noise. + temporal_variance = np.var(bias_stack, axis=0) # variance across frames per pixel + self.read_noise = float(np.sqrt(np.mean(temporal_variance))) # 3. Compute dark current rate dark_stack = np.array(self.dark_frames, dtype=np.float32) @@ -710,8 +719,9 @@ def _analyze_calibration(self): bias_stack_raw = np.array(self.bias_frames_raw, dtype=np.float32) self.bias_offset_raw = float(np.median(bias_stack_raw)) - # 2. Compute read noise from raw frames - self.read_noise_raw = float(np.std(bias_stack_raw)) + # 2. Compute read noise using temporal variance (not spatial) + temporal_variance_raw = np.var(bias_stack_raw, axis=0) + self.read_noise_raw = float(np.sqrt(np.mean(temporal_variance_raw))) # 3. Compute dark current rate from raw frames dark_stack_raw = np.array(self.dark_frames_raw, dtype=np.float32) @@ -764,22 +774,15 @@ def _calculate_sky_sqm(self, exposure_sec: float): # Create SQM calculator with the newly measured calibration # Use PROCESSED (8-bit) pipeline camera_type_processed = f"{self.shared_state.camera_type()}_processed" - sqm_calc = SQM( - camera_type=camera_type_processed, use_adaptive_noise_floor=True - ) + sqm_calc = SQM(camera_type=camera_type_processed) # Manually set the calibration values we just measured - if ( - sqm_calc.noise_estimator is not None - and self.bias_offset is not None - and self.read_noise is not None - and self.dark_current_rate is not None - ): - sqm_calc.noise_estimator.profile.bias_offset = self.bias_offset - sqm_calc.noise_estimator.profile.read_noise_adu = self.read_noise - sqm_calc.noise_estimator.profile.dark_current_rate = ( - self.dark_current_rate - ) + if self.bias_offset is not None: + sqm_calc.profile.bias_offset = self.bias_offset + if self.read_noise is not None: + sqm_calc.profile.read_noise_adu = self.read_noise + if self.dark_current_rate is not None: + sqm_calc.profile.dark_current_rate = self.dark_current_rate self.sqm_values = [] @@ -943,16 +946,20 @@ def key_square(self): def key_number(self, number): """Number key cancels calibration or skips sky frames""" if number == 0: + # Skip sky frames and proceed to analysis (indoor calibration) + if self.state == CalibrationState.CAP_OFF_INSTRUCTION: + logger.info("User skipped sky frames (indoor calibration)") + self.state = CalibrationState.ANALYZING + self.current_frame = 0 # If capturing sky frames, skip to analysis - if self.state == CalibrationState.CAPTURING_SKY: - logger.info("User skipped sky frame capture") + elif self.state == CalibrationState.CAPTURING_SKY: + logger.info("User skipped remaining sky frame capture") self.state = CalibrationState.ANALYZING self.current_frame = 0 # Otherwise cancel and exit elif self.state in [ CalibrationState.INTRO, CalibrationState.CAP_ON_INSTRUCTION, - CalibrationState.CAP_OFF_INSTRUCTION, ]: if self.remove_from_stack: self.remove_from_stack() diff --git a/python/PiFinder/ui/sqm_correction.py b/python/PiFinder/ui/sqm_correction.py deleted file mode 100644 index aa27f436a..000000000 --- a/python/PiFinder/ui/sqm_correction.py +++ /dev/null @@ -1,331 +0,0 @@ -""" -SQM Correction UI - Allows user to manually correct SQM values -and save correction data packages for calibration validation. -""" - -import os -import json -import zipfile -import logging -from datetime import datetime -from pathlib import Path -from typing import Any, Dict, TYPE_CHECKING -from PIL import Image -import numpy as np - -from PiFinder.ui.base import UIModule -from PiFinder.ui.numeric_entry import ( - NumericEntryField, - EntryLegend, - LegendItem, - BlinkingCursor, -) -from PiFinder import utils - -if TYPE_CHECKING: - - def _(a) -> Any: - return a - - -logger = logging.getLogger("PiFinder.SQMCorrection") - - -class UISQMCorrection(UIModule): - """ - UI for correcting SQM values and creating calibration packages. - - User enters a corrected SQM value, and the system creates a zip file containing: - - Raw 16-bit TIFF image - - Processed 8-bit PNG image - - JSON metadata with original/corrected SQM, GPS location, solve data, etc. - """ - - __title__ = _("SQM Correction") - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - # Get current SQM from shared state - sqm_state = self.shared_state.sqm() - self.original_sqm = sqm_state.value if sqm_state.value else 18.0 - - # Initialize numeric entry field with XX.XX format for SQM values - self.entry_field = NumericEntryField( - format_pattern="XX.XX", - validation_range=(10.0, 23.0), - placeholder_char="_", - ) - - # Initialize blinking cursor - self.cursor = BlinkingCursor(blink_interval=0.5) - - # Initialize legend with proper icons (copied from radec_entry.py) - back_icon = "" - go_icon = "" - - self.legend = EntryLegend( - items=[ - LegendItem(icon=back_icon, label=_("Cancel")), # left arrow - LegendItem(icon=go_icon, label=_("Save")), # right arrow - LegendItem(icon="- ", label=_("Del")), # minus with space - ], - show_separator=True, - layout="single_line", - ) - - self.error_message = None - self.success_message = None - self.message_time = None - - def update(self, force=False): - """Draw the correction UI""" - self.clear_screen() - - # Title - title = _("SQM Correction") - self.draw.text( - (0, 5), - title, - font=self.fonts.bold.font, - fill=self.colors.get(255), - ) - - # Show original SQM value - original_text = _("Original: {sqm:.2f}").format(sqm=self.original_sqm) - self.draw.text( - (0, 25), - original_text, - font=self.fonts.base.font, - fill=self.colors.get(128), - ) - - # Show correction input label - corrected_label = _("Corrected:") - self.draw.text( - (0, 45), - corrected_label, - font=self.fonts.base.font, - fill=self.colors.get(192), - ) - - # Calculate centered position for entry field - entry_y = 60 - char_width = self.fonts.large.width - total_width = char_width * len(self.entry_field.positions) - entry_x = (128 - total_width) // 2 - - # Draw numeric entry field using component with blinking cursor - self.entry_field.draw( - draw=self.draw, - screen=self.screen, - x=entry_x, - y=entry_y, - font=self.fonts.large.font, - char_width=char_width, - char_height=self.fonts.large.height, - normal_color=self.colors.get(255), - blinking_cursor=self.cursor, - ) - - # Show error or success message - message_y = 85 - if self.error_message and self.message_time: - # Show error for 3 seconds - if (datetime.now() - self.message_time).total_seconds() < 3: - self.draw.text( - (0, message_y), - self.error_message, - font=self.fonts.base.font, - fill=self.colors.get(255, red=True), - ) - else: - self.error_message = None - self.message_time = None - - if self.success_message and self.message_time: - # Show success for 3 seconds, then exit - if (datetime.now() - self.message_time).total_seconds() < 3: - self.draw.text( - (0, message_y), - self.success_message, - font=self.fonts.base.font, - fill=self.colors.get(255), - ) - else: - # Auto-exit after showing success message - if self.remove_from_stack: - self.remove_from_stack() - - # Draw legend at bottom using component - self.legend.draw( - draw=self.draw, - screen_width=128, - screen_height=128, - font=self.fonts.base.font, - font_height=self.fonts.base.height, - separator_color=self.colors.get(128), - text_color=self.colors.get(128), - margin=2, - ) - - return self.screen_update() - - def key_number(self, number): - """Handle number key input""" - self.entry_field.insert_digit(number) - - def key_0(self): - """Handle 0 key""" - self.entry_field.insert_digit(0) - - def key_plus(self): - """Delete - remove digit at cursor""" - self.entry_field.delete_digit() - - def key_minus(self): - """Delete - remove digit at cursor""" - self.entry_field.delete_digit() - - def key_left(self): - """Cancel and exit""" - if self.remove_from_stack: - self.remove_from_stack() - - def key_right(self): - """Save correction package""" - # Validate input using entry field component - is_valid, corrected_sqm = self.entry_field.validate() - - if not is_valid: - if corrected_sqm is None: - self.error_message = _("Enter a value") - else: - self.error_message = _("Range: 10-23") - self.message_time = datetime.now() - return - - # Create correction package - try: - zip_path = self._create_correction_package(corrected_sqm) - self.success_message = _("Saved: {filename}").format( - filename=os.path.basename(zip_path) - ) - self.message_time = datetime.now() - logger.info(f"SQM correction package saved: {zip_path}") - except Exception as e: - logger.error(f"Failed to create correction package: {e}") - self.error_message = _("Save failed") - self.message_time = datetime.now() - - def _create_correction_package(self, corrected_sqm: float) -> str: - """ - Create a zip file containing correction data. - - Returns: - Path to created zip file - """ - # Create corrections directory (consistent with captures/ and solver_debug_dumps/) - corrections_dir = Path(utils.data_dir) / "captures" / "sqm_corrections" - corrections_dir.mkdir(parents=True, exist_ok=True) - - # Generate timestamp for filename - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - zip_filename = f"sqm_correction_{timestamp}.zip" - zip_path = corrections_dir / zip_filename - - # Capture current camera image - camera_image = self.camera_image.copy() - - # Get raw image from shared state - raw_image = self.shared_state.cam_raw() - - # Collect metadata - metadata = self._collect_metadata(corrected_sqm) - - # Create zip file - with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: - # Add processed 8-bit PNG - processed_path = f"correction_{timestamp}_processed.png" - camera_image.save(str(corrections_dir / processed_path)) - zf.write(corrections_dir / processed_path, arcname=processed_path) - (corrections_dir / processed_path).unlink() # Clean up temp file - - # Add raw 16-bit TIFF if available - if raw_image is not None: - raw_path = f"correction_{timestamp}_raw.tiff" - raw_image_pil = Image.fromarray(np.asarray(raw_image, dtype=np.uint16)) - raw_image_pil.save(str(corrections_dir / raw_path)) - zf.write(corrections_dir / raw_path, arcname=raw_path) - (corrections_dir / raw_path).unlink() # Clean up temp file - - # Add metadata JSON - metadata_path = f"correction_{timestamp}_metadata.json" - with open(corrections_dir / metadata_path, "w") as f: - json.dump(metadata, f, indent=2, default=str) - zf.write(corrections_dir / metadata_path, arcname=metadata_path) - (corrections_dir / metadata_path).unlink() # Clean up temp file - - return str(zip_path) - - def _collect_metadata(self, corrected_sqm: float) -> Dict[str, Any]: - """Collect all relevant metadata for the correction package""" - metadata: Dict[str, Any] = { - "timestamp": datetime.now().isoformat(), - "sqm": { - "original": self.original_sqm, - "corrected": corrected_sqm, - "difference": corrected_sqm - self.original_sqm, - }, - } - - # Get SQM state for raw value - sqm_state = self.shared_state.sqm() - if sqm_state.value_raw is not None: - metadata["sqm"]["original_raw"] = sqm_state.value_raw - - # Get GPS location - location = self.shared_state.location() - if location and location.lock: - metadata["location"] = { - "latitude": location.lat, - "longitude": location.lon, - "altitude_m": location.altitude, - "timezone": location.timezone, - } - - # Get GPS datetime - gps_datetime = self.shared_state.datetime() - if gps_datetime: - metadata["gps_datetime"] = gps_datetime.isoformat() - - # Get solve data (RA/Dec/Alt/Az) - solution = self.shared_state.solution() - if solution: - metadata["solve"] = { - "ra_deg": solution.get("RA"), - "dec_deg": solution.get("Dec"), - "altitude_deg": solution.get("Alt"), - "azimuth_deg": solution.get("Az"), - "fov_deg": solution.get("FOV"), - "matches": solution.get("Matches"), - "rmse": solution.get("RMSE"), - } - - # Get image metadata (exposure, gain, etc.) - image_metadata = self.shared_state.last_image_metadata() - if image_metadata: - metadata["image"] = { - "exposure_us": image_metadata.get("exposure_time"), - "exposure_sec": image_metadata.get("exposure_time", 0) / 1_000_000.0, - "gain": image_metadata.get("gain"), - } - - # Get camera type - metadata["camera_type"] = self.shared_state.camera_type() - - return metadata - - def active(self): - """Called when module becomes active""" - pass diff --git a/python/PiFinder/ui/exp_sweep.py b/python/PiFinder/ui/sqm_sweep.py similarity index 69% rename from python/PiFinder/ui/exp_sweep.py rename to python/PiFinder/ui/sqm_sweep.py index 77115bda9..56e8310b2 100644 --- a/python/PiFinder/ui/exp_sweep.py +++ b/python/PiFinder/ui/sqm_sweep.py @@ -1,17 +1,25 @@ #!/usr/bin/python # -*- coding:utf-8 -*- """ -Exposure Sweep UI +SQM Sweep UI -Multi-step wizard for capturing exposure sweeps with reference SQM. +Multi-step wizard for capturing exposure sweeps with reference SQM +and full metadata for calibration validation. """ +import json +import logging import time from enum import Enum from pathlib import Path + +import numpy as np + from PiFinder import utils from PiFinder.ui.base import UIModule +logger = logging.getLogger("PiFinder.SQMSweep") + class SweepState(Enum): """Sweep wizard states""" @@ -22,18 +30,18 @@ class SweepState(Enum): COMPLETE = "complete" # Sweep done -class UIExpSweep(UIModule): +class UISQMSweep(UIModule): """ - Exposure Sweep Wizard + SQM Sweep Wizard Steps: 1. Ask user for reference SQM value from external meter 2. Confirm ready to start - 3. Capture 100 image sweep - 4. Save metadata and exit + 3. Capture exposure sweep images + 4. Save full metadata (SQM details, solve, noise floor estimator) """ - __title__ = "EXP SWEEP" + __title__ = "SQM SWEEP" __help_name__ = "" def __init__(self, *args, **kwargs): @@ -45,8 +53,8 @@ def __init__(self, *args, **kwargs): self.start_time = None self.sweep_dir = None # Track the actual sweep directory self.initial_file_count = None # Files that existed before we started - self.total_images = 100 # Expected number of images - self.estimated_duration = 240 # 4 minutes estimated + self.total_images = 20 # Expected number of images + self.estimated_duration = 60 # ~1 minute estimated def active(self): """Called when module becomes active""" @@ -153,13 +161,13 @@ def _draw_confirm(self): self.draw.text( (10, 65), - "100 images", + "20 images", font=self.fonts.base.font, fill=self.colors.get(192), ) self.draw.text( (10, 77), - "~4 minutes", + "~1 minute", font=self.fonts.base.font, fill=self.colors.get(192), ) @@ -242,6 +250,7 @@ def _draw_capturing(self): # Auto-complete when all files captured if file_count >= self.total_images: + self._add_detailed_metadata() self.state = SweepState.COMPLETE def _get_sweep_files_since_start(self): @@ -271,6 +280,102 @@ def _get_sweep_files_since_start(self): # If anything fails, return 0 to avoid crashing the UI return 0 + def _add_detailed_metadata(self): + """Add detailed metadata including SQM state and NoiseFloorEstimator output.""" + try: + from PiFinder.sqm.noise_floor import NoiseFloorEstimator + + # Find the sweep directory + captures_dir = Path(utils.data_dir) / "captures" + sweep_dirs = [ + d for d in captures_dir.glob("sweep_*") + if d.stat().st_ctime >= (self.start_time - 1) + ] + if not sweep_dirs: + logger.warning("No sweep directory found for metadata update") + return + + sweep_dir = max(sweep_dirs, key=lambda p: p.stat().st_ctime) + metadata_file = sweep_dir / "sweep_metadata.json" + + if not metadata_file.exists(): + logger.warning(f"No metadata file found at {metadata_file}") + return + + # Load existing metadata + with open(metadata_file, "r") as f: + metadata = json.load(f) + + # Add current SQM state (like sqm_correction does) + sqm_state = self.shared_state.sqm() + if sqm_state: + metadata["sqm"] = { + "pifinder_value": sqm_state.value, + "reference_value": self.reference_sqm, + "difference": (self.reference_sqm - sqm_state.value) + if self.reference_sqm and sqm_state.value else None, + "source": sqm_state.source, + } + + # Add full SQM calculation details + sqm_details = self.shared_state.sqm_details() + if sqm_details: + metadata["sqm_calculation"] = sqm_details + + # Add image metadata + image_metadata = self.shared_state.last_image_metadata() + if image_metadata: + metadata["image"] = { + "exposure_us": image_metadata.get("exposure_time"), + "exposure_sec": image_metadata.get("exposure_time", 0) / 1_000_000.0, + "gain": image_metadata.get("gain"), + "imu_delta": image_metadata.get("imu_delta"), + } + + # Add solve data + solution = self.shared_state.solution() + if solution: + metadata["solve"] = { + "ra_deg": solution.get("RA"), + "dec_deg": solution.get("Dec"), + "altitude_deg": solution.get("Alt"), + "azimuth_deg": solution.get("Az"), + "fov_deg": solution.get("FOV"), + "matches": solution.get("Matches"), + "rmse": solution.get("RMSE"), + } + + # Add NoiseFloorEstimator output + camera_type = self.shared_state.camera_type() + camera_type_processed = f"{camera_type}_processed" + exposure_sec = (image_metadata.get("exposure_time", 500000) / 1_000_000.0 + if image_metadata else 0.5) + + if self.camera_image is not None: + image_array = np.array(self.camera_image.convert("L")) + + estimator = NoiseFloorEstimator( + camera_type=camera_type_processed, + enable_zero_sec_sampling=False, + ) + _, nf_details = estimator.estimate_noise_floor( + image=image_array, + exposure_sec=exposure_sec, + ) + + nf_details.pop("request_zero_sec_sample", None) + nf_details["camera_type"] = camera_type_processed + metadata["noise_floor_estimator"] = nf_details + + # Save updated metadata + with open(metadata_file, "w") as f: + json.dump(metadata, f, indent=2) + + logger.info(f"Added detailed metadata to {metadata_file}") + + except Exception as e: + logger.error(f"Failed to add detailed metadata: {e}") + def _draw_complete(self): """Draw completion screen""" self.draw.text( diff --git a/python/tests/test_auto_exposure.py b/python/tests/test_auto_exposure.py index 5f4c24a57..bca7bed58 100644 --- a/python/tests/test_auto_exposure.py +++ b/python/tests/test_auto_exposure.py @@ -77,14 +77,15 @@ def test_initialization(self): handler = SweepZeroStarHandler() assert not handler.is_active() assert handler._trigger_count == 2 + # Sweep starts at 400ms, goes up, then tries shorter exposures assert handler._exposures == [ - 25000, - 50000, - 100000, - 200000, 400000, 800000, 1000000, + 200000, + 100000, + 50000, + 25000, ] assert handler._repeats_per_exposure == 2 @@ -104,78 +105,74 @@ def test_trigger_delay(self): assert result is None assert not handler.is_active() - # Second zero - should activate and return first exposure + # Second zero - should activate and return first exposure (400ms) result = handler.handle(50000, 2) - assert result == 25000 + assert result == 400000 assert handler.is_active() - def test_sweep_pattern_short_exposures(self): - """Sweep repeats each short exposure 2 times.""" + def test_sweep_pattern_start_at_400ms(self): + """Sweep starts at 400ms and repeats each exposure 2 times.""" handler = SweepZeroStarHandler(trigger_count=1) - # Activate sweep + # Activate sweep - starts at 400ms result = handler.handle(50000, 1) - assert result == 25000 # First attempt at 25ms + assert result == 400000 # First attempt at 400ms - # Second attempt at 25ms + # Second attempt at 400ms result = handler.handle(50000, 2) - assert result == 25000 + assert result == 400000 - # Move to 50ms + # Move to 800ms result = handler.handle(50000, 3) - assert result == 50000 + assert result == 800000 - # Second attempt at 50ms + # Second attempt at 800ms result = handler.handle(50000, 4) - assert result == 50000 + assert result == 800000 - # Move to 100ms + # Move to 1000ms result = handler.handle(50000, 5) - assert result == 100000 + assert result == 1000000 - def test_400ms_normal_behavior(self): - """Sweep treats 400ms like other exposures (2 attempts).""" + def test_sweep_continues_to_shorter_exposures(self): + """After longer exposures, sweep tries shorter ones.""" handler = SweepZeroStarHandler(trigger_count=1) - # Skip to 400ms by advancing through earlier exposures - # 25ms: 2 times - handler.handle(50000, 1) # 25ms attempt 1 - handler.handle(50000, 2) # 25ms attempt 2 - # 50ms: 2 times - handler.handle(50000, 3) # 50ms attempt 1 - handler.handle(50000, 4) # 50ms attempt 2 - # 100ms: 2 times - handler.handle(50000, 5) # 100ms attempt 1 - handler.handle(50000, 6) # 100ms attempt 2 - # 200ms: 2 times - handler.handle(50000, 7) # 200ms attempt 1 - handler.handle(50000, 8) # 200ms attempt 2 - - # Now at 400ms - should repeat 2 times like other exposures - result = handler.handle(50000, 9) - assert result == 400000 # Attempt 1 + # 400ms: 2 times (first in sweep) + handler.handle(50000, 1) # 400ms attempt 1 + handler.handle(50000, 2) # 400ms attempt 2 + # 800ms: 2 times + handler.handle(50000, 3) # 800ms attempt 1 + handler.handle(50000, 4) # 800ms attempt 2 + # 1000ms: 2 times + handler.handle(50000, 5) # 1000ms attempt 1 + handler.handle(50000, 6) # 1000ms attempt 2 + + # Now at 200ms - sweep continues with shorter exposures + result = handler.handle(50000, 7) + assert result == 200000 # Attempt 1 - result = handler.handle(50000, 10) - assert result == 400000 # Attempt 2 + result = handler.handle(50000, 8) + assert result == 200000 # Attempt 2 - # After 2 cycles, move to 800ms - result = handler.handle(50000, 11) - assert result == 800000 + # Move to 100ms + result = handler.handle(50000, 9) + assert result == 100000 def test_sweep_wraps_around(self): - """Sweep wraps back to minimum after reaching maximum.""" + """Sweep wraps back to start (400ms) after completing all exposures.""" handler = SweepZeroStarHandler(trigger_count=1) # Fast-forward through entire sweep - # 25ms (2×), 50ms (2×), 100ms (2×), 200ms (2×), 400ms (2×), 800ms (2×), 1000ms (2×) + # 400ms (2×), 800ms (2×), 1000ms (2×), 200ms (2×), 100ms (2×), 50ms (2×), 25ms (2×) total_cycles = 2 + 2 + 2 + 2 + 2 + 2 + 2 # = 14 for i in range(1, total_cycles + 1): handler.handle(50000, i) - # Next cycle should wrap to 25ms + # Next cycle should wrap to 400ms (start of sweep) result = handler.handle(50000, total_cycles + 1) - assert result == 25000 + assert result == 400000 def test_reset(self): """Reset clears handler state.""" @@ -331,8 +328,7 @@ def test_initialization_defaults(self): """Controller initializes with default parameters.""" pid = ExposurePIDController() assert pid.target_stars == 17 - assert pid.gains_decrease == (500.0, 5.0, 250.0) - assert pid.gains_increase == (4000.0, 250.0, 1500.0) + assert pid.gains == (2000.0, 100.0, 750.0) assert pid.min_exposure == 25000 assert pid.max_exposure == 1000000 assert pid.deadband == 5 @@ -383,7 +379,7 @@ def test_pid_decreases_exposure_for_too_many_stars(self): def test_pid_clamps_to_min_exposure(self): """PID clamps output to minimum exposure.""" pid = ExposurePIDController( - target_stars=15, min_exposure=25000, gains_decrease=(50000.0, 0.0, 0.0) + target_stars=15, min_exposure=25000, gains=(50000.0, 0.0, 0.0) ) # Many stars should drive exposure down to minimum @@ -393,7 +389,7 @@ def test_pid_clamps_to_min_exposure(self): def test_pid_clamps_to_max_exposure(self): """PID clamps output to maximum exposure.""" pid = ExposurePIDController( - target_stars=15, max_exposure=1000000, gains_increase=(50000.0, 0.0, 0.0) + target_stars=15, max_exposure=1000000, gains=(50000.0, 0.0, 0.0) ) # Very few stars should drive exposure up to maximum @@ -466,15 +462,8 @@ def test_set_gains(self): """set_gains updates PID coefficients.""" pid = ExposurePIDController() - # Update decrease gains - pid.set_gains(gains_decrease=(5000.0, 300.0, 2000.0)) - assert pid.gains_decrease == (5000.0, 300.0, 2000.0) - assert pid.gains_increase == (4000.0, 250.0, 1500.0) # Unchanged (new default) - - # Update increase gains - pid.set_gains(gains_increase=(10000.0, 600.0, 4000.0)) - assert pid.gains_increase == (10000.0, 600.0, 4000.0) - assert pid.gains_decrease == (5000.0, 300.0, 2000.0) # Unchanged + pid.set_gains((5000.0, 300.0, 2000.0)) + assert pid.gains == (5000.0, 300.0, 2000.0) def test_get_status(self): """get_status returns controller state.""" @@ -484,8 +473,7 @@ def test_get_status(self): status = pid.get_status() assert status["target_stars"] == 15 - assert status["gains_decrease"] == (500.0, 5.0, 250.0) # New defaults - assert status["gains_increase"] == (4000.0, 250.0, 1500.0) # New defaults + assert status["gains"] == (2000.0, 100.0, 750.0) assert status["min_exposure"] == 25000 assert status["max_exposure"] == 1000000 assert status["deadband"] == 2 @@ -509,18 +497,18 @@ def test_full_zero_star_recovery_cycle(self): result = pid.update(0, current_exposure) assert result is None # Trigger count = 2 - # Zero stars (second time) - sweep activates + # Zero stars (second time) - sweep activates at 400ms result = pid.update(0, current_exposure) - assert result == 25000 # Sweep starts at 25ms + assert result == 400000 # Sweep starts at 400ms assert pid._zero_star_handler.is_active() # Continue sweep result = pid.update(0, current_exposure) - assert result == 25000 # Second attempt at 25ms + assert result == 400000 # Second attempt at 400ms # Still zero stars, sweep continues result = pid.update(0, current_exposure) - assert result == 50000 # Move to 50ms + assert result == 800000 # Move to 800ms # Find stars again - return to PID result = pid.update(15, 50000) @@ -532,8 +520,7 @@ def test_pid_proportional_response(self): """Test that PID responds proportionally to error magnitude.""" pid = ExposurePIDController( target_stars=15, - gains_decrease=(1000.0, 0.0, 0.0), - gains_increase=(1000.0, 0.0, 0.0), + gains=(1000.0, 0.0, 0.0), deadband=0, ) @@ -557,8 +544,7 @@ def test_integral_windup_protection(self): """Test that integral term is clamped to prevent windup.""" pid = ExposurePIDController( target_stars=15, - gains_decrease=(0.0, 100.0, 0.0), - gains_increase=(0.0, 100.0, 0.0), + gains=(0.0, 100.0, 0.0), min_exposure=25000, max_exposure=1000000, ) @@ -570,17 +556,14 @@ def test_integral_windup_protection(self): current_exposure = result # Integral should be clamped, not infinite - max_integral = (pid.max_exposure - pid.min_exposure) / ( - 2.0 * pid.gains_increase[1] - ) + max_integral = (pid.max_exposure - pid.min_exposure) / (2.0 * pid.gains[1]) assert abs(pid._integral) <= max_integral def test_derivative_dampens_oscillation(self): """Test that derivative term responds to rate of change.""" pid = ExposurePIDController( target_stars=15, - gains_decrease=(0.0, 0.0, 1000.0), - gains_increase=(0.0, 0.0, 1000.0), + gains=(0.0, 0.0, 1000.0), deadband=0, ) diff --git a/python/tests/test_sqm.py b/python/tests/test_sqm.py index c689ca0cd..9ad1f3381 100644 --- a/python/tests/test_sqm.py +++ b/python/tests/test_sqm.py @@ -1,7 +1,16 @@ +import json + import pytest import numpy as np from PiFinder.sqm import SQM +from PiFinder.sqm.noise_floor import NoiseFloorEstimator +from PiFinder.sqm.camera_profiles import ( + CameraProfile, + get_camera_profile, + detect_camera_type, +) +from PiFinder.sqm.save_sweep_metadata import save_sweep_metadata @pytest.mark.unit @@ -11,23 +20,30 @@ class TestSQMExtinction: """ def test_extinction_at_zenith(self): - """Test that extinction at zenith is exactly 0.28 mag (1.0 airmass)""" + """Test that extinction at zenith is 0.0 mag (ASTAP convention: zenith is reference)""" sqm = SQM() extinction = sqm._atmospheric_extinction(90.0) - assert extinction == pytest.approx(0.28, abs=0.0001) + assert extinction == pytest.approx(0.0, abs=0.0001) def test_extinction_at_45_degrees(self): - """Test extinction at 45° altitude (airmass ≈ 1.414)""" + """Test extinction at 45° altitude using Pickering airmass""" sqm = SQM() extinction = sqm._atmospheric_extinction(45.0) - expected = 0.28 * 1.414213562373095 # 0.28 * sqrt(2) + # Pickering (2002) airmass at 45° ≈ 1.4124 + # ASTAP convention: 0.28 * (airmass - 1) + pickering_airmass = sqm._pickering_airmass(45.0) + expected = 0.28 * (pickering_airmass - 1) assert extinction == pytest.approx(expected, abs=0.001) def test_extinction_at_30_degrees(self): - """Test extinction at 30° altitude (airmass = 2.0)""" + """Test extinction at 30° altitude using Pickering airmass""" sqm = SQM() extinction = sqm._atmospheric_extinction(30.0) - assert extinction == pytest.approx(0.56, abs=0.001) # 0.28 * 2.0 + # Pickering (2002) airmass at 30° ≈ 1.995 + # ASTAP convention: 0.28 * (airmass - 1) ≈ 0.279 + pickering_airmass = sqm._pickering_airmass(30.0) + expected = 0.28 * (pickering_airmass - 1) + assert extinction == pytest.approx(expected, abs=0.001) def test_extinction_increases_toward_horizon(self): """Test that extinction increases as altitude decreases""" @@ -42,10 +58,13 @@ def test_extinction_increases_toward_horizon(self): ), f"Extinction at {altitudes[i]}° should be less than at {altitudes[i+1]}°" def test_extinction_minimum_is_at_zenith(self): - """Test that zenith (90°) has the minimum possible extinction""" + """Test that zenith (90°) has zero extinction (ASTAP convention)""" sqm = SQM() zenith_extinction = sqm._atmospheric_extinction(90.0) + # Zenith should have exactly zero extinction + assert zenith_extinction == pytest.approx(0.0, abs=0.0001) + # Test various altitudes - all should have more extinction than zenith test_altitudes = [89, 80, 70, 60, 50, 40, 30, 20, 10] for alt in test_altitudes: @@ -63,27 +82,87 @@ def test_extinction_invalid_altitude(self): assert sqm._atmospheric_extinction(-90.0) == 0.0 def test_extinction_airmass_relationship(self): - """Test the airmass formula: airmass = 1 / sin(altitude)""" + """Test the Pickering airmass formula: extinction = 0.28 * (airmass - 1)""" sqm = SQM() - # At 90°: airmass should be 1.0 + # At 90°: airmass ≈ 1.0, extinction ≈ 0 altitude = 90.0 - airmass = 1.0 / np.sin(np.radians(altitude)) + airmass = sqm._pickering_airmass(altitude) extinction = sqm._atmospheric_extinction(altitude) - assert extinction == pytest.approx(0.28 * airmass, abs=0.0001) + assert extinction == pytest.approx(0.28 * (airmass - 1), abs=0.0001) + assert extinction == pytest.approx(0.0, abs=0.0001) - # At 30°: airmass should be 2.0 + # At 30°: Pickering airmass ≈ 1.995 altitude = 30.0 - airmass = 1.0 / np.sin(np.radians(altitude)) + airmass = sqm._pickering_airmass(altitude) extinction = sqm._atmospheric_extinction(altitude) - assert extinction == pytest.approx(0.28 * airmass, abs=0.0001) - assert airmass == pytest.approx(2.0, abs=0.001) + assert extinction == pytest.approx(0.28 * (airmass - 1), abs=0.0001) + assert airmass == pytest.approx(1.995, abs=0.01) - # At 6°: airmass ≈ 9.6 (very close to horizon) + # At 6°: Pickering airmass is more accurate near horizon than simple formula altitude = 6.0 - airmass = 1.0 / np.sin(np.radians(altitude)) + airmass = sqm._pickering_airmass(altitude) extinction = sqm._atmospheric_extinction(altitude) - assert extinction == pytest.approx(0.28 * airmass, abs=0.001) + assert extinction == pytest.approx(0.28 * (airmass - 1), abs=0.001) + + +@pytest.mark.unit +class TestPickeringAirmass: + """ + Unit tests for Pickering (2002) airmass formula. + """ + + def test_airmass_at_zenith(self): + """Test airmass at zenith is 1.0""" + sqm = SQM() + airmass = sqm._pickering_airmass(90.0) + assert airmass == pytest.approx(1.0, abs=0.001) + + def test_airmass_at_45_degrees(self): + """Test airmass at 45° altitude""" + sqm = SQM() + airmass = sqm._pickering_airmass(45.0) + # Pickering airmass at 45° ≈ 1.413 (slightly less than simple 1/sin(45°) = 1.414) + assert airmass == pytest.approx(1.413, abs=0.01) + + def test_airmass_at_30_degrees(self): + """Test airmass at 30° altitude""" + sqm = SQM() + airmass = sqm._pickering_airmass(30.0) + # Pickering airmass at 30° ≈ 1.995 (slightly less than simple 1/sin(30°) = 2.0) + assert airmass == pytest.approx(1.995, abs=0.01) + + def test_airmass_at_10_degrees(self): + """Test airmass at 10° altitude shows Pickering correction""" + sqm = SQM() + pickering = sqm._pickering_airmass(10.0) + simple = 1.0 / np.sin(np.radians(10.0)) + # Pickering gives ~5.60, simple gives ~5.76 (3% difference) + assert pickering == pytest.approx(5.60, abs=0.05) + assert simple > pickering # Simple overestimates at low altitudes + + def test_airmass_at_5_degrees(self): + """Test airmass at 5° altitude shows significant Pickering correction.""" + sqm = SQM() + pickering = sqm._pickering_airmass(5.0) + simple = 1.0 / np.sin(np.radians(5.0)) + # Pickering gives ~10.3, simple gives ~11.47 (10% difference) + # Expected range from literature: ≈ 9-10 + assert pickering == pytest.approx(10.3, abs=0.5) + assert simple == pytest.approx(11.47, abs=0.1) + assert simple > pickering # Simple significantly overestimates near horizon + + def test_airmass_increases_toward_horizon(self): + """Test airmass increases monotonically toward horizon""" + sqm = SQM() + altitudes = [90, 60, 45, 30, 20, 10, 5] + airmasses = [sqm._pickering_airmass(alt) for alt in altitudes] + + for i in range(len(airmasses) - 1): + assert airmasses[i] < airmasses[i + 1], ( + f"Airmass at {altitudes[i]}° ({airmasses[i]:.3f}) should be less than " + f"at {altitudes[i+1]}° ({airmasses[i+1]:.3f})" + ) @pytest.mark.unit @@ -94,6 +173,7 @@ class TestSQMCalculation: def test_calculate_returns_tuple(self): """Test that calculate() returns a tuple (value, details_dict)""" + np.random.seed(42) sqm = SQM() # Create minimal mock data @@ -110,9 +190,9 @@ def test_calculate_returns_tuple(self): centroids = [[100, 100], [200, 200], [300, 300]] image = np.random.randint(800, 1200, (512, 512), dtype=np.uint16) - # Add bright spots for stars - for x, y in centroids: - image[y - 2 : y + 3, x - 2 : x + 3] += 5000 + # Add bright spots for stars at (row, col) positions + for row, col in centroids: + image[row - 2 : row + 3, col - 2 : col + 3] += 5000 result = sqm.calculate( centroids=centroids, @@ -133,7 +213,8 @@ def test_calculate_returns_tuple(self): assert isinstance(result[1], dict) def test_calculate_extinction_applied(self): - """Test that extinction correction is applied to final SQM value""" + """Test that extinction correction follows ASTAP convention""" + np.random.seed(42) sqm = SQM() solution = { @@ -149,46 +230,57 @@ def test_calculate_extinction_applied(self): centroids = [[100, 100], [200, 200], [300, 300]] image = np.random.randint(800, 1200, (512, 512), dtype=np.uint16) - for x, y in centroids: - image[y - 2 : y + 3, x - 2 : x + 3] += 5000 + for row, col in centroids: + image[row - 2 : row + 3, col - 2 : col + 3] += 5000 # Calculate at zenith - sqm_zenith, details_zenith = sqm.calculate( + # Note: Use high saturation threshold for uint16 test images + _sqm_zenith, details_zenith = sqm.calculate( centroids=centroids, solution=solution, image=image, exposure_sec=0.5, altitude_deg=90.0, + saturation_threshold=65000, ) # Calculate at 30° (2× airmass) - sqm_30deg, details_30deg = sqm.calculate( + _sqm_30deg, details_30deg = sqm.calculate( centroids=centroids, solution=solution, image=image, exposure_sec=0.5, altitude_deg=30.0, + saturation_threshold=65000, ) - # Check extinction values - assert details_zenith["extinction_correction"] == pytest.approx(0.28, abs=0.001) - assert details_30deg["extinction_correction"] == pytest.approx(0.56, abs=0.001) + # Check extinction values (ASTAP convention: 0 at zenith) + # Pickering airmass at 30° ≈ 1.995, so extinction ≈ 0.28 * 0.995 ≈ 0.279 + assert details_zenith["extinction_for_altitude"] == pytest.approx(0.0, abs=0.001) + expected_ext_30 = 0.28 * (sqm._pickering_airmass(30.0) - 1) + assert details_30deg["extinction_for_altitude"] == pytest.approx( + expected_ext_30, abs=0.001 + ) - # Raw SQM should be the same (same image, same stars) - assert details_zenith["sqm_raw"] == pytest.approx( - details_30deg["sqm_raw"], abs=0.001 + # Uncorrected SQM should be the same (same image, same stars) + assert details_zenith["sqm_uncorrected"] == pytest.approx( + details_30deg["sqm_uncorrected"], abs=0.001 ) - # Final SQM should differ by extinction difference - extinction_diff = ( - details_30deg["extinction_correction"] - - details_zenith["extinction_correction"] + # sqm_final is raw (no extinction), sqm_altitude_corrected adds extinction + # At zenith: sqm_final == sqm_altitude_corrected (extinction is 0) + assert details_zenith["sqm_final"] == pytest.approx( + details_zenith["sqm_altitude_corrected"], abs=0.001 + ) + + # At 30°: sqm_altitude_corrected = sqm_final + extinction + assert details_30deg["sqm_altitude_corrected"] == pytest.approx( + details_30deg["sqm_final"] + expected_ext_30, abs=0.001 ) - sqm_diff = sqm_30deg - sqm_zenith - assert sqm_diff == pytest.approx(extinction_diff, abs=0.001) def test_calculate_missing_fov(self): """Test that calculate() returns None when FOV is missing""" + np.random.seed(42) sqm = SQM() solution = { @@ -211,6 +303,7 @@ def test_calculate_missing_fov(self): def test_calculate_no_matched_stars(self): """Test that calculate() returns None when no stars are matched""" + np.random.seed(42) sqm = SQM() solution = { @@ -232,6 +325,56 @@ def test_calculate_no_matched_stars(self): assert sqm_value is None assert details == {} + def test_calculate_asymmetric_centroids(self): + """Test with asymmetric centroids to verify (row, col) convention. + + This test uses centroids where row != col to catch any (x,y)/(y,x) confusion. + If the coordinate convention is wrong, stars won't be found at the expected + positions and the calculation will fail or produce incorrect results. + """ + np.random.seed(42) + sqm = SQM() + + # CRITICAL: Use asymmetric coordinates to catch any (x,y)/(y,x) confusion + # Format is (row, col) = (y, x), NOT (x, y) + asymmetric_centroids = np.array([[50, 400], [100, 300], [150, 200]]) + + solution = { + "FOV": 10.0, + "matched_centroids": asymmetric_centroids, + "matched_stars": [ + [45.0, 30.0, 5.0], + [45.1, 30.1, 6.0], + [45.2, 30.2, 7.0], + ], + } + + image = np.random.randint(800, 1200, (512, 512), dtype=np.uint16) + + # Add bright spots at correct positions: image[row, col] + for row, col in asymmetric_centroids: + image[row - 2 : row + 3, col - 2 : col + 3] += 5000 + + result, details = sqm.calculate( + centroids=asymmetric_centroids.tolist(), + solution=solution, + image=image, + exposure_sec=0.5, + altitude_deg=90.0, + saturation_threshold=65000, + ) + + # Should find valid SQM (stars are at correct positions) + assert result is not None, "SQM calculation failed with asymmetric centroids" + assert details["n_matched_stars"] == 3 + + # Verify star fluxes are positive (stars were found at correct positions) + valid_fluxes = [f for f in details["star_fluxes"] if f > 0] + assert len(valid_fluxes) == 3, ( + f"Expected 3 valid star fluxes but got {len(valid_fluxes)}. " + "This may indicate coordinate convention mismatch." + ) + @pytest.mark.unit class TestSQMFieldParameters: @@ -276,3 +419,836 @@ def test_field_parameters_different_fov(self): assert sqm.arcsec_squared_per_pixel == pytest.approx( expected_arcsec_sq_per_pixel, abs=0.01 ), f"Failed for FOV={fov}" + + +@pytest.mark.unit +class TestMzeroCalculation: + """Unit tests for photometric zero point calculation.""" + + def test_mzero_single_star(self): + """Test mzero calculation with a single star.""" + sqm = SQM() + # mzero = mag + 2.5 * log10(flux) + # For flux=1000, mag=5.0: mzero = 5.0 + 2.5*log10(1000) = 5.0 + 7.5 = 12.5 + fluxes = [1000.0] + mags = [5.0] + + mzero, mzeros = sqm._calculate_mzero(fluxes, mags) + + expected = 5.0 + 2.5 * np.log10(1000) + assert mzero == pytest.approx(expected, abs=0.001) + assert len(mzeros) == 1 + assert mzeros[0] == pytest.approx(expected, abs=0.001) + + def test_mzero_flux_weighted_mean(self): + """Test that mzero uses flux-weighted mean (brighter stars weighted more).""" + sqm = SQM() + # Two stars: one bright (high flux), one dim (low flux) + # The bright star's mzero should dominate + fluxes = [10000.0, 100.0] # 100x difference + mags = [4.0, 8.0] + + mzero, mzeros = sqm._calculate_mzero(fluxes, mags) + + # Individual mzeros + mzero_bright = 4.0 + 2.5 * np.log10(10000) # = 14.0 + mzero_dim = 8.0 + 2.5 * np.log10(100) # = 13.0 + + # Flux-weighted: (14.0*10000 + 13.0*100) / (10000+100) ≈ 13.99 + expected_weighted = (mzero_bright * 10000 + mzero_dim * 100) / (10000 + 100) + + assert mzero == pytest.approx(expected_weighted, abs=0.001) + assert mzeros[0] == pytest.approx(mzero_bright, abs=0.001) + assert mzeros[1] == pytest.approx(mzero_dim, abs=0.001) + + def test_mzero_skips_negative_flux(self): + """Test that stars with negative/zero flux are skipped.""" + sqm = SQM() + fluxes = [1000.0, -1.0, 500.0] # Middle star is saturated (flux=-1) + mags = [5.0, 6.0, 5.5] + + mzero, mzeros = sqm._calculate_mzero(fluxes, mags) + + # Should only use stars 0 and 2 + assert mzero is not None + assert mzeros[0] is not None + assert mzeros[1] is None # Skipped + assert mzeros[2] is not None + + def test_mzero_all_invalid_returns_none(self): + """Test that all invalid fluxes returns None.""" + sqm = SQM() + fluxes = [-1.0, 0.0, -5.0] + mags = [5.0, 6.0, 7.0] + + mzero, mzeros = sqm._calculate_mzero(fluxes, mags) + + assert mzero is None + assert all(m is None for m in mzeros) + + +@pytest.mark.unit +class TestApertureOverlapDetection: + """Unit tests for aperture overlap detection.""" + + def test_no_overlap_far_apart(self): + """Test that far apart stars have no overlap.""" + sqm = SQM() + # Two stars 100 pixels apart + centroids = np.array([[100.0, 100.0], [200.0, 100.0]]) + aperture_radius = 5 + annulus_inner = 6 + annulus_outer = 14 + + excluded = sqm._detect_aperture_overlaps( + centroids, aperture_radius, annulus_inner, annulus_outer + ) + + assert len(excluded) == 0 + + def test_critical_overlap_apertures_touch(self): + """Test CRITICAL overlap when apertures overlap (distance < 2*aperture_radius).""" + sqm = SQM() + # Two stars 8 pixels apart, aperture_radius=5, so 2*5=10 > 8 + centroids = np.array([[100.0, 100.0], [108.0, 100.0]]) + aperture_radius = 5 + annulus_inner = 6 + annulus_outer = 14 + + excluded = sqm._detect_aperture_overlaps( + centroids, aperture_radius, annulus_inner, annulus_outer + ) + + # Both stars should be excluded + assert 0 in excluded + assert 1 in excluded + + def test_high_overlap_aperture_in_annulus(self): + """Test HIGH overlap when aperture enters another star's annulus.""" + sqm = SQM() + # Stars 15 pixels apart: aperture(5) + annulus_outer(14) = 19 > 15 + # This means star 1's aperture is inside star 0's annulus + centroids = np.array([[100.0, 100.0], [115.0, 100.0]]) + aperture_radius = 5 + annulus_inner = 6 + annulus_outer = 14 + + excluded = sqm._detect_aperture_overlaps( + centroids, aperture_radius, annulus_inner, annulus_outer + ) + + # Both should be excluded due to HIGH overlap + assert 0 in excluded + assert 1 in excluded + + def test_no_overlap_at_threshold(self): + """Test no overlap when exactly at safe distance.""" + sqm = SQM() + # Stars 20 pixels apart: aperture(5) + annulus_outer(14) = 19 < 20 + centroids = np.array([[100.0, 100.0], [120.0, 100.0]]) + aperture_radius = 5 + annulus_inner = 6 + annulus_outer = 14 + + excluded = sqm._detect_aperture_overlaps( + centroids, aperture_radius, annulus_inner, annulus_outer + ) + + assert len(excluded) == 0 + + +@pytest.mark.unit +class TestNoiseFloorEstimation: + """Unit tests for adaptive noise floor estimation.""" + + def test_temporal_noise_calculation(self): + """Test temporal noise = read_noise + dark_current * exposure.""" + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + # imx296_processed has read_noise=1.5, dark_current=0.0 + + noise = estimator._estimate_temporal_noise(exposure_sec=1.0) + + # With dark_current=0, temporal_noise = read_noise = 1.5 + assert noise == pytest.approx(1.5, abs=0.01) + + def test_noise_floor_uses_theory_when_dark_pixels_below_bias(self): + """Test that theory is used when dark pixels are impossibly low.""" + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + # bias_offset for imx296_processed is 6.0 + + # Create image with all pixels below bias offset (impossible in reality) + image = np.full((100, 100), 3.0, dtype=np.float32) + + noise_floor, _ = estimator.estimate_noise_floor(image, exposure_sec=0.5) + + # Should use theoretical floor since dark pixels (3.0) < bias (6.0) + assert noise_floor >= estimator.profile.bias_offset + + def test_noise_floor_uses_measured_when_valid(self): + """Test that measured dark pixels are used when valid.""" + np.random.seed(42) + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + # bias_offset=6.0, read_noise=1.5, dark_current=0.0 + # theoretical_floor = 6.0 + 1.5 = 7.5 + + # Create image with 5th percentile around 8.0 (valid, above bias) + # Most pixels higher, some low pixels around 8 + image = np.random.uniform(10, 50, (100, 100)).astype(np.float32) + image[0:5, 0:5] = 8.0 # Dark corner + + noise_floor, _ = estimator.estimate_noise_floor(image, exposure_sec=0.5) + + # Should use min(measured, theoretical) + # 5th percentile should be around 8, theoretical is 7.5 + # So should use theoretical 7.5 + assert noise_floor == pytest.approx(7.5, abs=0.5) + + def test_noise_floor_clamped_to_bias_offset(self): + """Test that noise floor is never below bias offset.""" + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + + # Even with weird inputs, should never go below bias + image = np.full((100, 100), 100.0, dtype=np.float32) + + noise_floor, _ = estimator.estimate_noise_floor(image, exposure_sec=0.5) + + assert noise_floor >= estimator.profile.bias_offset + + def test_history_smoothing_after_multiple_estimates(self): + """Test that history smoothing kicks in after 5+ estimates.""" + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + + # First 4 estimates - no smoothing yet + for i in range(4): + image = np.full((100, 100), 10.0 + i, dtype=np.float32) + _, details = estimator.estimate_noise_floor(image, exposure_sec=0.5) + assert details["n_history_samples"] == i + 1 + + # 5th estimate - smoothing should kick in + image = np.full((100, 100), 15.0, dtype=np.float32) + _, details = estimator.estimate_noise_floor(image, exposure_sec=0.5) + assert details["n_history_samples"] == 5 + # dark_pixel_smoothed should be median of history, not raw value + # History: [10, 11, 12, 13, 15] -> median = 12 + assert details["dark_pixel_smoothed"] == pytest.approx(12.0, abs=0.1) + + def test_update_with_zero_sec_sample(self): + """Test zero-second sample updates profile gradually.""" + np.random.seed(42) + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + original_bias = estimator.profile.bias_offset + + # Need 3 samples before profile updates + for i in range(3): + # Zero-sec image with bias around 10 + zero_sec = np.random.normal(10.0, 1.5, (100, 100)).astype(np.float32) + estimator.update_with_zero_sec_sample(zero_sec) + + # After 3 samples, profile should update with EMA (alpha=0.2) + # new_bias = 0.2 * measured + 0.8 * original + expected_bias = 0.2 * 10.0 + 0.8 * original_bias + assert estimator.profile.bias_offset == pytest.approx(expected_bias, abs=0.5) + + def test_validate_estimate_too_close_to_median(self): + """Test validation fails when noise floor is too close to image median.""" + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + + # Image where darkest pixels are close to median (uniform image) + # This simulates a situation with no stars/sky gradient + image = np.full((100, 100), 10.0, dtype=np.float32) + + estimator.estimate_noise_floor(image, exposure_sec=0.5) + + # Should be invalid because noise floor (theoretical ~7.5) is close to median (10) + # Actually 7.5 is not > 10 * 0.8 = 8, so let's use a different test + # Need noise floor > median * 0.8 to trigger this + # Create image where dark pixels are above theoretical floor + image2 = np.full((100, 100), 8.0, dtype=np.float32) + estimator2 = NoiseFloorEstimator(camera_type="imx296_processed") + + _, details2 = estimator2.estimate_noise_floor(image2, exposure_sec=0.5) + + # noise_floor will be min(8.0, 7.5) = 7.5 + # median = 8.0, threshold = 8.0 * 0.8 = 6.4 + # 7.5 > 6.4, so should be invalid + assert details2["is_valid"] is False + assert "median" in details2["validation_reason"].lower() + + def test_get_statistics(self): + """Test get_statistics returns expected data.""" + np.random.seed(42) + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + + # Do a few estimates + for _ in range(3): + image = np.random.uniform(10, 50, (100, 100)).astype(np.float32) + estimator.estimate_noise_floor(image, exposure_sec=0.5) + + stats = estimator.get_statistics() + + assert stats["camera_type"] == "imx296_processed" + assert stats["n_estimates"] == 3 + assert stats["n_history_samples"] == 3 + assert "dark_pixel_mean" in stats + assert "dark_pixel_std" in stats + assert "dark_pixel_median" in stats + + def test_reset_clears_state(self): + """Test reset clears all history and statistics.""" + np.random.seed(42) + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + + # Build up some state + for _ in range(5): + image = np.random.uniform(10, 50, (100, 100)).astype(np.float32) + estimator.estimate_noise_floor(image, exposure_sec=0.5) + + assert estimator.n_estimates == 5 + assert len(estimator.dark_pixel_history) == 5 + + # Reset + estimator.reset() + + assert estimator.n_estimates == 0 + assert len(estimator.dark_pixel_history) == 0 + assert len(estimator.zero_sec_history) == 0 + + def test_save_and_load_calibration(self, tmp_path, monkeypatch): + """Test calibration save/load round-trip.""" + # Redirect Path.home() to tmp_path + monkeypatch.setattr("pathlib.Path.home", lambda: tmp_path) + + # Create PiFinder_data directory + (tmp_path / "PiFinder_data").mkdir() + + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + + # Save calibration with new values + result = estimator.save_calibration( + bias_offset=25.0, read_noise=3.5, dark_current_rate=0.5 + ) + assert result is True + + # Verify profile was updated + assert estimator.profile.bias_offset == 25.0 + assert estimator.profile.read_noise_adu == 3.5 + assert estimator.profile.dark_current_rate == 0.5 + + # Create new estimator - should load the saved calibration + estimator2 = NoiseFloorEstimator(camera_type="imx296_processed") + + # Should have loaded the saved values (not the defaults) + assert estimator2.profile.bias_offset == 25.0 + assert estimator2.profile.read_noise_adu == 3.5 + assert estimator2.profile.dark_current_rate == 0.5 + + +@pytest.mark.unit +class TestMeasureStarFluxWithLocalBackground: + """Unit tests for SQM._measure_star_flux_with_local_background().""" + + def test_single_star_flux_measurement(self): + """Test flux measurement for a single star with known values.""" + np.random.seed(42) + sqm = SQM() + + # Create image with uniform background of 100 ADU + image = np.full((100, 100), 100.0, dtype=np.float32) + + # Add a star at center (50, 50) with 1000 ADU above background + # Star in 5-pixel radius aperture + y, x = np.ogrid[:100, :100] + dist_sq = (x - 50) ** 2 + (y - 50) ** 2 + star_mask = dist_sq <= 25 # radius 5 + image[star_mask] += 1000 + + centroids = np.array([[50, 50]]) # (row, col) format + + fluxes, backgrounds, n_saturated = sqm._measure_star_flux_with_local_background( + image=image, + centroids=centroids, + aperture_radius=5, + annulus_inner_radius=6, + annulus_outer_radius=14, + saturation_threshold=65000, + ) + + # Background should be ~100 (the uniform background) + assert backgrounds[0] == pytest.approx(100.0, abs=1.0) + + # Flux should be ~1000 * number_of_aperture_pixels + # Aperture area for r=5 is approximately pi*5^2 = 78.5 pixels + assert fluxes[0] > 50000 # Should have significant positive flux + assert n_saturated == 0 + + def test_saturated_star_detection(self): + """Test that saturated stars are detected and marked with flux=-1.""" + sqm = SQM() + + # Create image with background + image = np.full((100, 100), 50.0, dtype=np.float32) + + # Add a saturated star (pixel value >= threshold) + image[48:53, 48:53] = 255 # Saturated pixels + + centroids = np.array([[50, 50]]) + + fluxes, backgrounds, n_saturated = sqm._measure_star_flux_with_local_background( + image=image, + centroids=centroids, + aperture_radius=5, + annulus_inner_radius=6, + annulus_outer_radius=14, + saturation_threshold=250, + ) + + assert fluxes[0] == -1 # Saturated stars get flux=-1 + assert n_saturated == 1 + + def test_multiple_stars(self): + """Test flux measurement for multiple stars.""" + np.random.seed(42) + sqm = SQM() + + # Create image with uniform background + image = np.full((200, 200), 80.0, dtype=np.float32) + + # Add two stars at different positions + y, x = np.ogrid[:200, :200] + + # Star 1 at (50, 50) + dist_sq1 = (x - 50) ** 2 + (y - 50) ** 2 + image[dist_sq1 <= 16] += 500 # radius 4 + + # Star 2 at (150, 150) + dist_sq2 = (x - 150) ** 2 + (y - 150) ** 2 + image[dist_sq2 <= 16] += 800 # brighter star + + centroids = np.array([[50, 50], [150, 150]]) + + fluxes, backgrounds, n_saturated = sqm._measure_star_flux_with_local_background( + image=image, + centroids=centroids, + aperture_radius=5, + annulus_inner_radius=6, + annulus_outer_radius=14, + saturation_threshold=65000, + ) + + assert len(fluxes) == 2 + assert len(backgrounds) == 2 + assert fluxes[0] > 0 # Both should have positive flux + assert fluxes[1] > 0 + assert fluxes[1] > fluxes[0] # Star 2 should be brighter + assert n_saturated == 0 + + def test_star_near_edge(self): + """Test flux measurement for a star near image edge.""" + sqm = SQM() + + # Create small image + image = np.full((50, 50), 60.0, dtype=np.float32) + + # Add star near edge + image[5:10, 5:10] += 300 + + centroids = np.array([[7, 7]]) + + fluxes, backgrounds, n_saturated = sqm._measure_star_flux_with_local_background( + image=image, + centroids=centroids, + aperture_radius=3, + annulus_inner_radius=4, + annulus_outer_radius=8, + saturation_threshold=65000, + ) + + # Should still return a result (clipped to image bounds) + assert len(fluxes) == 1 + assert fluxes[0] > 0 + + def test_local_background_varies_per_star(self): + """Test that each star uses its own local background.""" + sqm = SQM() + + # Create image with gradient background + image = np.zeros((200, 200), dtype=np.float32) + for i in range(200): + image[i, :] = 50 + i * 0.5 # Gradient from 50 to 150 + + # Add stars in different background regions + y, x = np.ogrid[:200, :200] + + # Star 1 in low background region (row 30) + dist_sq1 = (x - 100) ** 2 + (y - 30) ** 2 + image[dist_sq1 <= 16] += 500 + + # Star 2 in high background region (row 170) + dist_sq2 = (x - 100) ** 2 + (y - 170) ** 2 + image[dist_sq2 <= 16] += 500 + + centroids = np.array([[30, 100], [170, 100]]) + + fluxes, backgrounds, n_saturated = sqm._measure_star_flux_with_local_background( + image=image, + centroids=centroids, + aperture_radius=5, + annulus_inner_radius=6, + annulus_outer_radius=14, + saturation_threshold=65000, + ) + + # Backgrounds should be different (local to each star) + assert backgrounds[0] < backgrounds[1] + # Background at row 30 should be ~65, at row 170 should be ~135 + assert backgrounds[0] == pytest.approx(65, abs=10) + assert backgrounds[1] == pytest.approx(135, abs=10) + + +@pytest.mark.unit +class TestGetCameraProfile: + """Unit tests for camera_profiles.get_camera_profile().""" + + def test_get_known_camera_profile(self): + """Test getting profiles for known camera types.""" + profile = get_camera_profile("imx296") + + assert isinstance(profile, CameraProfile) + assert profile.format == "R10" + assert profile.bit_depth == 10 + assert profile.analog_gain == 15.0 + + def test_get_processed_camera_profile(self): + """Test getting processed (8-bit) camera profiles.""" + profile = get_camera_profile("imx296_processed") + + assert isinstance(profile, CameraProfile) + assert profile.bit_depth == 8 + assert profile.format == "L" + + def test_get_all_known_profiles(self): + """Test that all documented camera types are accessible.""" + camera_types = [ + "imx296", + "imx462", + "imx290", + "hq", + "imx296_processed", + "imx462_processed", + "imx290_processed", + "hq_processed", + ] + + for camera_type in camera_types: + profile = get_camera_profile(camera_type) + assert isinstance(profile, CameraProfile) + assert profile.raw_size is not None + + def test_unknown_camera_raises_error(self): + """Test that unknown camera type raises ValueError.""" + with pytest.raises(ValueError) as exc_info: + get_camera_profile("unknown_camera") + + assert "unknown_camera" in str(exc_info.value).lower() + assert "available" in str(exc_info.value).lower() + + +@pytest.mark.unit +class TestDetectCameraType: + """Unit tests for camera_profiles.detect_camera_type().""" + + def test_detect_imx296(self): + """Test detection of IMX296 sensor.""" + assert detect_camera_type("imx296") == "imx296" + assert detect_camera_type("IMX296") == "imx296" + assert detect_camera_type("imx296_mono") == "imx296" + + def test_detect_imx290_maps_to_imx462(self): + """Test that IMX290 maps to IMX462 profile (driver compatibility).""" + assert detect_camera_type("imx290") == "imx462" + assert detect_camera_type("IMX290") == "imx462" + + def test_detect_imx477_maps_to_hq(self): + """Test that IMX477 maps to HQ profile.""" + assert detect_camera_type("imx477") == "hq" + assert detect_camera_type("IMX477") == "hq" + assert detect_camera_type("raspberry_pi_imx477") == "hq" + + def test_unknown_hardware_raises_error(self): + """Test that unknown hardware ID raises ValueError.""" + with pytest.raises(ValueError) as exc_info: + detect_camera_type("unknown_sensor") + + assert "unknown_sensor" in str(exc_info.value).lower() + assert "supported" in str(exc_info.value).lower() + + +@pytest.mark.unit +class TestCropAndRotate: + """Unit tests for CameraProfile.crop_and_rotate().""" + + def test_no_crop_no_rotation(self): + """Test with no cropping and no rotation.""" + profile = CameraProfile( + format="R10", + raw_size=(100, 100), + analog_gain=1.0, + crop_y=(0, 0), + crop_x=(0, 0), + rotation_90=0, + ) + + arr = np.arange(100 * 100).reshape(100, 100) + result = profile.crop_and_rotate(arr) + + assert result.shape == (100, 100) + np.testing.assert_array_equal(result, arr) + + def test_vertical_crop(self): + """Test vertical (y) cropping.""" + profile = CameraProfile( + format="R10", + raw_size=(100, 100), + analog_gain=1.0, + crop_y=(10, 20), # Remove 10 from top, 20 from bottom + crop_x=(0, 0), + rotation_90=0, + ) + + arr = np.arange(100 * 100).reshape(100, 100) + result = profile.crop_and_rotate(arr) + + assert result.shape == (70, 100) # 100 - 10 - 20 = 70 + np.testing.assert_array_equal(result, arr[10:-20, :]) + + def test_horizontal_crop(self): + """Test horizontal (x) cropping.""" + profile = CameraProfile( + format="R10", + raw_size=(100, 100), + analog_gain=1.0, + crop_y=(0, 0), + crop_x=(15, 25), # Remove 15 from left, 25 from right + rotation_90=0, + ) + + arr = np.arange(100 * 100).reshape(100, 100) + result = profile.crop_and_rotate(arr) + + assert result.shape == (100, 60) # 100 - 15 - 25 = 60 + np.testing.assert_array_equal(result, arr[:, 15:-25]) + + def test_both_crops(self): + """Test both vertical and horizontal cropping.""" + profile = CameraProfile( + format="R10", + raw_size=(100, 100), + analog_gain=1.0, + crop_y=(5, 10), + crop_x=(10, 15), + rotation_90=0, + ) + + arr = np.arange(100 * 100).reshape(100, 100) + result = profile.crop_and_rotate(arr) + + assert result.shape == (85, 75) # (100-5-10, 100-10-15) + np.testing.assert_array_equal(result, arr[5:-10, 10:-15]) + + def test_rotation_90(self): + """Test 90-degree counter-clockwise rotation.""" + profile = CameraProfile( + format="R10", + raw_size=(100, 50), + analog_gain=1.0, + crop_y=(0, 0), + crop_x=(0, 0), + rotation_90=1, # 90° CCW + ) + + arr = np.arange(100 * 50).reshape(100, 50) + result = profile.crop_and_rotate(arr) + + assert result.shape == (50, 100) # Dimensions swapped + np.testing.assert_array_equal(result, np.rot90(arr, 1)) + + def test_rotation_180(self): + """Test 180-degree rotation.""" + profile = CameraProfile( + format="R10", + raw_size=(100, 100), + analog_gain=1.0, + crop_y=(0, 0), + crop_x=(0, 0), + rotation_90=2, # 180° + ) + + arr = np.arange(100 * 100).reshape(100, 100) + result = profile.crop_and_rotate(arr) + + assert result.shape == (100, 100) + np.testing.assert_array_equal(result, np.rot90(arr, 2)) + + def test_crop_then_rotate(self): + """Test that crop is applied before rotation.""" + profile = CameraProfile( + format="R10", + raw_size=(100, 80), + analog_gain=1.0, + crop_y=(10, 10), # 100 -> 80 + crop_x=(0, 0), + rotation_90=1, # 90° CCW + ) + + arr = np.arange(100 * 80).reshape(100, 80) + result = profile.crop_and_rotate(arr) + + # After crop: (80, 80), after 90° rotation: (80, 80) + assert result.shape == (80, 80) + + # Verify correct order: crop first, then rotate + expected = np.rot90(arr[10:-10, :], 1) + np.testing.assert_array_equal(result, expected) + + def test_imx296_profile_crop_and_rotate(self): + """Test with actual IMX296 profile settings.""" + profile = get_camera_profile("imx296") + + # Create array matching IMX296 raw size + arr = np.ones(profile.raw_size[::-1], dtype=np.uint16) # (height, width) + + result = profile.crop_and_rotate(arr) + + # IMX296 crops 184 from each side horizontally and rotates 180° + # Original: (1088, 1456) -> crop to (1088, 1088) -> same after 180° rotation + expected_width = profile.raw_size[0] - profile.crop_x[0] - profile.crop_x[1] + expected_height = profile.raw_size[1] - profile.crop_y[0] - profile.crop_y[1] + + # After rotation, if 180°, dimensions stay same + assert result.shape == (expected_height, expected_width) + + +@pytest.mark.unit +class TestSaveSweepMetadata: + """Unit tests for save_sweep_metadata.save_sweep_metadata().""" + + def test_save_minimal_metadata(self, tmp_path): + """Test saving metadata with only required fields.""" + sweep_dir = tmp_path / "sweep_test" + sweep_dir.mkdir() + + result = save_sweep_metadata( + sweep_dir=sweep_dir, + observer_lat=50.85, + observer_lon=4.35, + ) + + assert result == sweep_dir / "sweep_metadata.json" + assert result.exists() + + with open(result) as f: + data = json.load(f) + + assert data["observer"]["latitude_deg"] == 50.85 + assert data["observer"]["longitude_deg"] == 4.35 + assert "timestamp" in data + assert data["sweep_directory"] == str(sweep_dir) + + def test_save_full_metadata(self, tmp_path): + """Test saving metadata with all optional fields.""" + sweep_dir = tmp_path / "sweep_full" + sweep_dir.mkdir() + + result = save_sweep_metadata( + sweep_dir=sweep_dir, + observer_lat=50.85, + observer_lon=4.35, + observer_altitude_m=100.0, + gps_datetime="2024-01-15T22:30:00+00:00", + reference_sqm=21.5, + ra_deg=180.0, + dec_deg=45.0, + altitude_deg=60.0, + azimuth_deg=90.0, + notes="Clear sky, no moon", + ) + + with open(result) as f: + data = json.load(f) + + assert data["observer"]["altitude_m"] == 100.0 + assert data["timestamp"] == "2024-01-15T22:30:00+00:00" + assert data["reference_sqm"] == 21.5 + assert data["coordinates"]["ra_deg"] == 180.0 + assert data["coordinates"]["dec_deg"] == 45.0 + assert data["coordinates"]["altitude_deg"] == 60.0 + assert data["coordinates"]["azimuth_deg"] == 90.0 + assert data["notes"] == "Clear sky, no moon" + + def test_save_partial_coordinates(self, tmp_path): + """Test saving metadata with only RA/Dec (no alt/az).""" + sweep_dir = tmp_path / "sweep_partial" + sweep_dir.mkdir() + + save_sweep_metadata( + sweep_dir=sweep_dir, + observer_lat=50.85, + observer_lon=4.35, + ra_deg=120.0, + dec_deg=30.0, + ) + + with open(sweep_dir / "sweep_metadata.json") as f: + data = json.load(f) + + assert data["coordinates"]["ra_deg"] == 120.0 + assert data["coordinates"]["dec_deg"] == 30.0 + assert "altitude_deg" not in data["coordinates"] + assert "azimuth_deg" not in data["coordinates"] + + def test_coordinates_require_both_ra_dec(self, tmp_path): + """Test that coordinates block requires both RA and Dec.""" + sweep_dir = tmp_path / "sweep_ra_only" + sweep_dir.mkdir() + + # Only RA, no Dec - should not create coordinates block + save_sweep_metadata( + sweep_dir=sweep_dir, + observer_lat=50.85, + observer_lon=4.35, + ra_deg=120.0, # Only RA, no Dec + ) + + with open(sweep_dir / "sweep_metadata.json") as f: + data = json.load(f) + + assert "coordinates" not in data + + def test_uses_gps_datetime_when_provided(self, tmp_path): + """Test that provided GPS datetime is used instead of current time.""" + sweep_dir = tmp_path / "sweep_gps" + sweep_dir.mkdir() + + gps_time = "2024-06-15T12:00:00+00:00" + save_sweep_metadata( + sweep_dir=sweep_dir, + observer_lat=50.85, + observer_lon=4.35, + gps_datetime=gps_time, + ) + + with open(sweep_dir / "sweep_metadata.json") as f: + data = json.load(f) + + assert data["timestamp"] == gps_time + + def test_nonexistent_directory_raises_error(self, tmp_path): + """Test that saving to nonexistent directory raises an error.""" + nonexistent = tmp_path / "does_not_exist" + + with pytest.raises(Exception): # FileNotFoundError or similar + save_sweep_metadata( + sweep_dir=nonexistent, + observer_lat=50.85, + observer_lon=4.35, + )