diff --git a/.claude/settings.local.json b/.claude/settings.local.json deleted file mode 100644 index ffa799142..000000000 --- a/.claude/settings.local.json +++ /dev/null @@ -1,40 +0,0 @@ -{ - "permissions": { - "allow": [ - "Bash(find:*)", - "Bash(jj log:*)", - "Bash(jj status:*)", - "Bash(jj show:*)", - "Bash(jj file list:*)", - "Bash(ls:*)", - "Bash(git checkout:*)", - "Bash(git pull:*)", - "Bash(jj branch:*)", - "Bash(jj git fetch:*)", - "Bash(jj help:*)", - "Bash(jj bookmark:*)", - "Bash(jj describe:*)", - "Bash(jj rebase:*)", - "WebFetch(domain:localhost)", - "mcp__playwright__browser_navigate", - "mcp__playwright__browser_type", - "mcp__playwright__browser_click", - "mcp__playwright__browser_take_screenshot", - "mcp__playwright__browser_snapshot", - "mcp__playwright__browser_press_key", - "mcp__playwright__browser_close", - "Bash(pkill:*)", - "Bash(source:*)", - "Bash(python:*)", - "Bash(pip install:*)", - "Bash(timeout 10 python:*)", - "Bash(grep:*)", - "Bash(nox:*)", - "Bash(# Navigate to python dir and set up venv\nPYTHON_DIR=\"\"/Users/mike/dev/amateur_astro/myPiFinder/wt-radec/python\"\"\ncd \"\"$PYTHON_DIR\"\"\npython3.9 -m venv .venv\nsource .venv/bin/activate\npip install -r requirements.txt\npip install -r requirements_dev.txt)", - "Bash(PYTHON_DIR=\"/Users/mike/dev/amateur_astro/myPiFinder/wt-radec/python\")", - "Bash(pytest:*)", - "Bash(sed:*)", - ], - "deny": [] - } -} diff --git a/python/PiFinder/auto_exposure.py b/python/PiFinder/auto_exposure.py index ea6d05afc..eadcc4f61 100644 --- a/python/PiFinder/auto_exposure.py +++ b/python/PiFinder/auto_exposure.py @@ -121,10 +121,10 @@ def __init__( self._repeat_count = 0 # Sweep pattern: exposure values in microseconds - # Uses doubling pattern (2x each step) + # Start at 400ms (reasonable middle), sweep up, then try shorter exposures # Note: This is intentionally NOT using generate_exposure_sweep() because - # it uses a specific doubling pattern - self._exposures = [25000, 50000, 100000, 200000, 400000, 800000, 1000000] + # it uses a specific pattern optimized for recovery + self._exposures = [400000, 800000, 1000000, 200000, 100000, 50000, 25000] self._repeats_per_exposure = 2 # Try each exposure 2 times logger.info( @@ -619,6 +619,183 @@ def reset(self) -> None: logger.debug("HistogramZeroStarHandler reset") +class ExposureSNRController: + """ + SNR-based auto exposure for SQM measurements. + + Targets a minimum background SNR and exposure time instead of star count. + This provides more stable, longer exposures that are better for accurate + SQM measurements compared to the histogram-based approach. + + Strategy: + - Target specific background level above noise floor + - Derive thresholds from camera profile (bit depth, bias offset) + - Slower adjustments for stability + """ + + def __init__( + self, + min_exposure: int = 10000, # 10ms minimum + max_exposure: int = 1000000, # 1.0s maximum + target_background: int = 30, # Target background level in ADU + min_background: int = 15, # Minimum acceptable background + max_background: int = 100, # Maximum before saturating + adjustment_factor: float = 1.3, # Gentle adjustments (30% steps) + ): + """ + Initialize SNR-based auto exposure. + + Args: + min_exposure: Minimum exposure in microseconds (default 10ms) + max_exposure: Maximum exposure in microseconds (default 1000ms) + target_background: Target median background level in ADU + min_background: Minimum acceptable background (increase if below) + max_background: Maximum acceptable background (decrease if above) + adjustment_factor: Multiplicative adjustment step (default 1.3 = 30%) + """ + self.min_exposure = min_exposure + self.max_exposure = max_exposure + self.target_background = target_background + self.min_background = min_background + self.max_background = max_background + self.adjustment_factor = adjustment_factor + + logger.info( + f"AutoExposure SNR: target_bg={target_background}, " + f"range=[{min_background}, {max_background}] ADU, " + f"exp_range=[{min_exposure/1000:.0f}, {max_exposure/1000:.0f}]ms, " + f"adjustment={adjustment_factor}x" + ) + + @classmethod + def from_camera_profile( + cls, + camera_type: str, + min_exposure: int = 10000, + max_exposure: int = 1000000, + adjustment_factor: float = 1.3, + ) -> "ExposureSNRController": + """ + Create controller with thresholds derived from camera profile. + + Calculates min/target/max background based on bit depth and bias offset. + + Args: + camera_type: Camera type (e.g., "imx296_processed", "imx462_processed") + min_exposure: Minimum exposure in microseconds + max_exposure: Maximum exposure in microseconds + adjustment_factor: Multiplicative adjustment step + + Returns: + ExposureSNRController configured for the camera + """ + from PiFinder.sqm.camera_profiles import get_camera_profile + + profile = get_camera_profile(camera_type) + + # Derive thresholds from camera specs + max_adu = (2 ** profile.bit_depth) - 1 + bias = profile.bias_offset + + # min_background: bias + margin (2x bias or bias + 8, whichever larger) + min_background = int(max(bias * 2, bias + 8)) + + # max_background: ~40% of full range (avoid saturation/nonlinearity) + max_background = int(max_adu * 0.4) + + # target_background: just above min (lower = shorter exposure = more linear response) + target_background = min_background + 2 + + logger.info( + f"SNR controller from {camera_type}: " + f"bit_depth={profile.bit_depth}, bias={bias:.0f}, " + f"thresholds=[{min_background}, {target_background}, {max_background}]" + ) + + return cls( + min_exposure=min_exposure, + max_exposure=max_exposure, + target_background=target_background, + min_background=min_background, + max_background=max_background, + adjustment_factor=adjustment_factor, + ) + + def update( + self, + current_exposure: int, + image: Image.Image, + noise_floor: Optional[float] = None, + **kwargs # Ignore other params (matched_stars, etc.) + ) -> Optional[int]: + """ + Update exposure based on background level. + + Args: + current_exposure: Current exposure in microseconds + image: Current image for analysis + noise_floor: Adaptive noise floor from SQM calculator (if available) + **kwargs: Ignored (for compatibility with PID interface) + + Returns: + New exposure in microseconds, or None if no change needed + """ + # Use adaptive noise floor if available, otherwise fall back to static config + # Need margin above noise floor so background_corrected isn't near zero + if noise_floor is not None: + min_bg = noise_floor + 2 + else: + min_bg = self.min_background + + # Analyze image + if image.mode != "L": + image = image.convert("L") + img_array = np.asarray(image, dtype=np.float32) + + # Use 10th percentile as background estimate (dark pixels) + background = float(np.percentile(img_array, 10)) + + logger.debug( + f"SNR AE: bg={background:.1f}, min={min_bg:.1f} ADU, exp={current_exposure/1000:.0f}ms" + ) + + # Determine adjustment + new_exposure = None + + if background < min_bg: + # Too dark - increase exposure + new_exposure = int(current_exposure * self.adjustment_factor) + logger.info( + f"SNR AE: Background too low ({background:.1f} < {min_bg:.1f}), " + f"increasing exposure {current_exposure/1000:.0f}ms → {new_exposure/1000:.0f}ms" + ) + elif background > self.max_background: + # Too bright - decrease exposure + new_exposure = int(current_exposure / self.adjustment_factor) + logger.info( + f"SNR AE: Background too high ({background:.1f} > {self.max_background}), " + f"decreasing exposure {current_exposure/1000:.0f}ms → {new_exposure/1000:.0f}ms" + ) + else: + # Background is in acceptable range + logger.debug(f"SNR AE: OK (bg={background:.1f} ADU)") + return None + + # Clamp to limits + new_exposure = max(self.min_exposure, min(self.max_exposure, new_exposure)) + return new_exposure + + def get_status(self) -> dict: + return { + "mode": "SNR", + "target_background": self.target_background, + "min_background": self.min_background, + "max_background": self.max_background, + "min_exposure": self.min_exposure, + "max_exposure": self.max_exposure, + } + + class ExposurePIDController: """ PID controller for automatic camera exposure adjustment. @@ -664,6 +841,7 @@ def __init__( self._integral = 0.0 self._last_error: Optional[float] = None self._zero_star_count = 0 + self._nonzero_star_count = 0 # Hysteresis: consecutive non-zero solves self._last_adjustment_time = 0.0 self._zero_star_handler = zero_star_handler or SweepZeroStarHandler( min_exposure=min_exposure, max_exposure=max_exposure @@ -680,6 +858,7 @@ def reset(self) -> None: self._integral = 0.0 self._last_error = None self._zero_star_count = 0 + self._nonzero_star_count = 0 self._last_adjustment_time = 0.0 self._zero_star_handler.reset() logger.debug("PID controller reset") @@ -725,6 +904,16 @@ def _update_pid(self, matched_stars: int, current_exposure: int) -> Optional[int # Select gains: conservative when decreasing, aggressive when increasing kp, ki, kd = self.gains_decrease if error < 0 else self.gains_increase + # Reset integral when error changes sign to prevent accumulated integral + # from crashing exposure when conditions change suddenly + # (e.g., going from too many stars to too few stars) + if self._last_error is not None: + if (error > 0 and self._last_error < 0) or (error < 0 and self._last_error > 0): + logger.debug( + f"PID: Error sign changed ({self._last_error:.0f} → {error:.0f}), resetting integral" + ) + self._integral = 0.0 + # PID calculation p_term = kp * error diff --git a/python/PiFinder/camera_interface.py b/python/PiFinder/camera_interface.py index 009fc6579..9752f55c0 100644 --- a/python/PiFinder/camera_interface.py +++ b/python/PiFinder/camera_interface.py @@ -21,12 +21,14 @@ from PiFinder import state_utils, utils from PiFinder.auto_exposure import ( ExposurePIDController, + ExposureSNRController, SweepZeroStarHandler, ExponentialSweepZeroStarHandler, ResetZeroStarHandler, HistogramZeroStarHandler, generate_exposure_sweep, ) +from PiFinder.sqm.camera_profiles import detect_camera_type logger = logging.getLogger("Camera.Interface") @@ -37,7 +39,9 @@ class CameraInterface: _camera_started = False _save_next_to = None # Filename to save next capture to (None = don't save) _auto_exposure_enabled = False + _auto_exposure_mode = "pid" # "pid" or "snr" _auto_exposure_pid: Optional[ExposurePIDController] = None + _auto_exposure_snr: Optional[ExposureSNRController] = None _last_solve_time: Optional[float] = None def initialize(self) -> None: @@ -114,13 +118,18 @@ def get_image_loop( root_dir, "test_images", "pifinder_debug_02.png" ) - # 60 half-second cycles + # 60 half-second cycles (30 seconds between captures in sleep mode) sleep_delay = 60 + was_sleeping = False while True: sleeping = state_utils.sleep_for_framerate( shared_state, limit_framerate=False ) if sleeping: + # Log transition to sleep mode + if not was_sleeping: + logger.info("Camera entering low-power sleep mode") + was_sleeping = True # Even in sleep mode, we want to take photos every # so often to update positions sleep_delay -= 1 @@ -128,6 +137,10 @@ def get_image_loop( continue else: sleep_delay = 60 + logger.debug("Sleep mode: waking for periodic capture") + elif was_sleeping: + logger.info("Camera exiting low-power sleep mode") + was_sleeping = False imu_start = shared_state.imu() image_start_time = time.time() @@ -213,11 +226,35 @@ def get_image_loop( f"RMSE: {rmse_str}, Current exposure: {self.exposure_time}µs" ) - # Call PID update (now handles zero stars with recovery mode) - # Pass base_image for histogram analysis in zero-star handler - new_exposure = self._auto_exposure_pid.update( - matched_stars, self.exposure_time, base_image - ) + # Call auto-exposure update based on current mode + if self._auto_exposure_mode == "snr": + # SNR mode: use background-based controller (for SQM measurements) + if self._auto_exposure_snr is None: + # Use camera profile to derive thresholds + try: + cam_type = detect_camera_type(self.get_cam_type()) + cam_type = f"{cam_type}_processed" + self._auto_exposure_snr = ( + ExposureSNRController.from_camera_profile(cam_type) + ) + except ValueError as e: + # Unknown camera, use defaults + logger.warning( + f"Camera detection failed: {e}, using default SNR thresholds" + ) + self._auto_exposure_snr = ExposureSNRController() + # Get adaptive noise floor from shared state + adaptive_noise_floor = self.shared_state.noise_floor() + new_exposure = self._auto_exposure_snr.update( + self.exposure_time, base_image, + noise_floor=adaptive_noise_floor + ) + else: + # PID mode: use star-count based controller (default) + # Pass base_image for histogram analysis in zero-star handler + new_exposure = self._auto_exposure_pid.update( + matched_stars, self.exposure_time, base_image + ) if ( new_exposure is not None @@ -332,6 +369,19 @@ def get_image_loop( "Cannot set AE handler: auto-exposure not initialized" ) + if command.startswith("set_ae_mode"): + mode = command.split(":")[1] + if mode in ["pid", "snr"]: + self._auto_exposure_mode = mode + console_queue.put(f"CAM: AE Mode={mode.upper()}") + logger.info( + f"Auto-exposure mode changed to: {mode.upper()}" + ) + else: + logger.warning( + f"Unknown auto-exposure mode: {mode} (valid: pid, snr)" + ) + if command == "exp_up" or command == "exp_dn": # Manual exposure adjustments disable auto-exposure self._auto_exposure_enabled = False @@ -416,11 +466,11 @@ def get_image_loop( # Disable auto-exposure during sweep self._auto_exposure_enabled = False - # Generate 100 exposure values with logarithmic spacing + # Generate 20 exposure values with logarithmic spacing # from 25ms (25000µs) to 1s (1000000µs) min_exp = 25000 # 25ms max_exp = 1000000 # 1s - num_images = 100 + num_images = 20 # Generate logarithmic sweep using shared utility sweep_exposures = generate_exposure_sweep( diff --git a/python/PiFinder/solver.py b/python/PiFinder/solver.py index 842cf683b..4d8b2e21d 100644 --- a/python/PiFinder/solver.py +++ b/python/PiFinder/solver.py @@ -35,40 +35,18 @@ def create_sqm_calculator(shared_state): - """Create a new SQM calculator instance for PROCESSED images with current calibration.""" - # Get camera type from shared state and use "_processed" profile - # since images are already processed 8-bit (not raw) + """Create a new SQM calculator instance with current calibration.""" camera_type = shared_state.camera_type() camera_type_processed = f"{camera_type}_processed" - logger.info( - f"Creating processed SQM calculator for camera: {camera_type_processed}" - ) - - return SQMCalculator( - camera_type=camera_type_processed, - use_adaptive_noise_floor=True, - ) - + logger.info(f"Creating SQM calculator for camera: {camera_type_processed}") -def create_sqm_calculator_raw(shared_state): - """Create a new SQM calculator instance for RAW 16-bit images with current calibration.""" - # Get camera type from shared state (raw profile, e.g., "imx296", "hq") - camera_type_raw = shared_state.camera_type() + return SQMCalculator(camera_type=camera_type_processed) - logger.info(f"Creating raw SQM calculator for camera: {camera_type_raw}") - return SQMCalculator( - camera_type=camera_type_raw, - use_adaptive_noise_floor=True, - ) - - -def update_sqm_dual_pipeline( +def update_sqm( shared_state, sqm_calculator, - sqm_calculator_raw, - camera_command_queue, centroids, solution, image_processed, @@ -80,22 +58,14 @@ def update_sqm_dual_pipeline( annulus_outer_radius=14, ): """ - Calculate SQM for BOTH processed (8-bit) and raw (16-bit) images. - - This function: - 1. Checks if enough time has passed since last update - 2. Calculates SQM from processed 8-bit image - 3. Captures a raw 16-bit frame, loads it, and calculates raw SQM - 4. Updates shared state with both values + Calculate SQM from image. Args: shared_state: SharedStateObj instance - sqm_calculator: SQM calculator for processed images - sqm_calculator_raw: SQM calculator for raw images - camera_command_queue: Queue to send raw capture command + sqm_calculator: SQM calculator instance centroids: List of detected star centroids solution: Tetra3 solve solution with matched stars - image_processed: Processed 8-bit image array + image_processed: Processed image array (numpy) exposure_sec: Exposure time in seconds altitude_deg: Altitude in degrees for extinction correction calculation_interval_seconds: Minimum time between calculations (default: 5.0) @@ -131,8 +101,8 @@ def update_sqm_dual_pipeline( return False try: - # ========== Calculate PROCESSED (8-bit) SQM ========== - sqm_value_processed, _ = sqm_calculator.calculate( + # Calculate SQM from image + sqm_value, details = sqm_calculator.calculate( centroids=centroids, solution=solution, image=image_processed, @@ -143,48 +113,29 @@ def update_sqm_dual_pipeline( annulus_outer_radius=annulus_outer_radius, ) - # ========== Calculate RAW (16-bit) SQM from shared state ========== - sqm_value_raw = None - - try: - # Get raw frame from shared state (already captured by camera) - raw_array = shared_state.cam_raw() - - if raw_array is not None: - raw_array = np.asarray(raw_array, dtype=np.float32) - - # Calculate raw SQM - sqm_value_raw, _ = sqm_calculator_raw.calculate( - centroids=centroids, - solution=solution, - image=raw_array, - exposure_sec=exposure_sec, - altitude_deg=altitude_deg, - aperture_radius=aperture_radius, - annulus_inner_radius=annulus_inner_radius, - annulus_outer_radius=annulus_outer_radius, - ) - - except Exception as e: - logger.warning(f"Failed to calculate raw SQM: {e}") - # Continue with just processed SQM - - # ========== Update shared state with BOTH values ========== - if sqm_value_processed is not None: + # Update noise floor in shared state (for SNR auto-exposure) + noise_floor_details = details.get("noise_floor_details") + if noise_floor_details and "noise_floor_adu" in noise_floor_details: + shared_state.set_noise_floor(noise_floor_details["noise_floor_adu"]) + + # Store SQM details (filter out large per-star arrays) + filtered_details = { + k: v + for k, v in details.items() + if k + not in ("star_centroids", "star_mags", "star_fluxes", "star_local_backgrounds", "star_mzeros") + } + shared_state.set_sqm_details(filtered_details) + + # Update shared state + if sqm_value is not None: new_sqm_state = SQMState( - value=sqm_value_processed, - value_raw=sqm_value_raw, # May be None if raw failed + value=sqm_value, source="Calculated", last_update=datetime.now().isoformat(), ) shared_state.set_sqm(new_sqm_state) - - raw_str = ( - f", raw={sqm_value_raw:.2f}" - if sqm_value_raw is not None - else ", raw=N/A" - ) - logger.info(f"SQM updated: processed={sqm_value_processed:.2f}{raw_str}") + logger.info(f"SQM updated: {sqm_value:.2f} mag/arcsec²") return True except Exception as e: @@ -194,6 +145,12 @@ def update_sqm_dual_pipeline( return False +class CedarConnectionError(Exception): + """Raised when Cedar gRPC connection fails.""" + + pass + + class PFCedarDetectClient(cedar_detect_client.CedarDetectClient): def __init__(self, port=50551): """Set up the client without spawning the server as we @@ -218,6 +175,60 @@ def _get_stub(self): ) return self._stub + def extract_centroids(self, image, sigma, max_size, use_binned, detect_hot_pixels=True): + """Override to raise CedarConnectionError on gRPC failure instead of returning empty list.""" + import numpy as np + from tetra3 import cedar_detect_pb2 + + np_image = np.asarray(image, dtype=np.uint8) + (height, width) = np_image.shape + centroids_result = None + + # Use shared memory path (same machine) + if self._use_shmem: + self._alloc_shmem(size=width * height) + shimg = np.ndarray(np_image.shape, dtype=np_image.dtype, buffer=self._shmem.buf) + shimg[:] = np_image[:] + + im = cedar_detect_pb2.Image(width=width, height=height, shmem_name=self._shmem.name) + req = cedar_detect_pb2.CentroidsRequest( + input_image=im, + sigma=sigma, + max_size=max_size, + return_binned=False, + use_binned_for_star_candidates=use_binned, + detect_hot_pixels=detect_hot_pixels, + ) + try: + centroids_result = self._get_stub().ExtractCentroids(req) + except grpc.RpcError as err: + if err.code() == grpc.StatusCode.INTERNAL: + # Shared memory issue, fall back to non-shmem + self._del_shmem() + self._use_shmem = False + else: + raise CedarConnectionError(f"Cedar gRPC failed: {err.details()}") from err + + if not self._use_shmem: + im = cedar_detect_pb2.Image(width=width, height=height, image_data=np_image.tobytes()) + req = cedar_detect_pb2.CentroidsRequest( + input_image=im, + sigma=sigma, + max_size=max_size, + return_binned=False, + use_binned_for_star_candidates=use_binned, + ) + try: + centroids_result = self._get_stub().ExtractCentroids(req) + except grpc.RpcError as err: + raise CedarConnectionError(f"Cedar gRPC failed: {err.details()}") from err + + tetra_centroids = [] + if centroids_result is not None: + for sc in centroids_result.star_candidates: + tetra_centroids.append((sc.centroid_position.y, sc.centroid_position.x)) + return tetra_centroids + def __del__(self): self._del_shmem() @@ -272,13 +283,13 @@ def solver( centroids = [] log_no_stars_found = True - # Create SQM calculators (processed and raw) - can be reloaded via command queue + # Create SQM calculator - can be reloaded via command queue sqm_calculator = create_sqm_calculator(shared_state) - sqm_calculator_raw = create_sqm_calculator_raw(shared_state) while True: logger.info("Starting Solver Loop") - # Start cedar detect server + # Try to start cedar detect server, fall back to tetra3 centroider if unavailable + cedar_detect = None try: cedar_detect = PFCedarDetectClient() except FileNotFoundError as e: @@ -286,10 +297,8 @@ def solver( "Not using cedar_detect, as corresponding file '%s' could not be found", e.filename, ) - cedar_detect = None except ValueError: logger.exception("Not using cedar_detect") - cedar_detect = None try: while True: @@ -316,12 +325,9 @@ def solver( align_dec = 0 if command[0] == "reload_sqm_calibration": - logger.info( - "Reloading SQM calibration (both processed and raw)..." - ) + logger.info("Reloading SQM calibration...") sqm_calculator = create_sqm_calculator(shared_state) - sqm_calculator_raw = create_sqm_calculator_raw(shared_state) - logger.info("SQM calibration reloaded for both pipelines") + logger.info("SQM calibration reloaded") state_utils.sleep_for_framerate(shared_state) @@ -357,13 +363,18 @@ def solver( ] t0 = precision_timestamp() - if cedar_detect is None: - # Use old tetr3 centroider - centroids = tetra3.get_centroids_from_image(np_image) + if cedar_detect is not None: + # Try Cedar first + try: + centroids = cedar_detect.extract_centroids( + np_image, sigma=8, max_size=10, use_binned=True + ) + except CedarConnectionError as e: + logger.warning(f"Cedar connection failed: {e}, falling back to tetra3") + centroids = tetra3.get_centroids_from_image(np_image) else: - centroids = cedar_detect.extract_centroids( - np_image, sigma=8, max_size=10, use_binned=True - ) + # Cedar not available, use tetra3 + centroids = tetra3.get_centroids_from_image(np_image) t_extract = (precision_timestamp() - t0) * 1000 logger.debug( @@ -408,11 +419,9 @@ def solver( last_image_metadata["exposure_time"] / 1_000_000.0 ) - update_sqm_dual_pipeline( + update_sqm( shared_state=shared_state, sqm_calculator=sqm_calculator, - sqm_calculator_raw=sqm_calculator_raw, - camera_command_queue=camera_command_queue, centroids=centroids, solution=solution, image_processed=np_image, @@ -430,7 +439,7 @@ def solver( solved |= solution - if "T_solve" in solution: + if "T_solve" in solved: total_tetra_time = t_extract + solved["T_solve"] if total_tetra_time > 1000: console_queue.put(f"SLV: Long: {total_tetra_time}") diff --git a/python/PiFinder/sqm/camera_profiles.py b/python/PiFinder/sqm/camera_profiles.py index 61e86e9cc..beeddab07 100644 --- a/python/PiFinder/sqm/camera_profiles.py +++ b/python/PiFinder/sqm/camera_profiles.py @@ -208,7 +208,7 @@ def __repr__(self) -> str: analog_gain=1.0, # Not applicable to processed images digital_gain=1.0, # Already applied during processing bit_depth=8, - bias_offset=8.0, # Conservative - below typical dark pixels (9-12) + bias_offset=6.0, # Calibrated against reference SQM meter read_noise_adu=1.5, # Quantization + residual noise in 8-bit dark_current_rate=0.0, # Negligible after processing thermal_coeff=0.0, diff --git a/python/PiFinder/sqm/sqm.ipynb b/python/PiFinder/sqm/sqm.ipynb index facb0cfa7..490d27c57 100644 --- a/python/PiFinder/sqm/sqm.ipynb +++ b/python/PiFinder/sqm/sqm.ipynb @@ -87,7 +87,6 @@ "logging.getLogger('Solver').setLevel(logging.WARNING)\n", "\n", "# Now try importing\n", - "from breadth_first_combinations import breadth_first_combinations\n", "\n", "\n", "import PiFinder.tetra3.tetra3 as tetra3\n", @@ -208,7 +207,7 @@ "\n", "def show_image(image):\n", " plt.imshow(image, cmap='gray')\n", - " plt.title(f\"Test image\")\n", + " plt.title(\"Test image\")\n", " plt.colorbar()\n", " plt.show() \n", "\n", @@ -1091,7 +1090,7 @@ " \n", " # Check if solve succeeded\n", " if 'matched_centroids' not in solution or len(solution['matched_centroids']) == 0:\n", - " print(f\" ❌ Failed to solve\")\n", + " print(\" ❌ Failed to solve\")\n", " results_summary.append({\n", " 'filename': filename,\n", " 'expected': info['realsqm'],\n", @@ -1129,7 +1128,7 @@ " 'status': 'OK'\n", " })\n", " else:\n", - " print(f\" ❌ Failed to calculate SQM\")\n", + " print(\" ❌ Failed to calculate SQM\")\n", " results_summary.append({\n", " 'filename': filename,\n", " 'expected': info['realsqm'],\n", @@ -1486,7 +1485,7 @@ " print(f\" Stars affected: {len(overlapping_stars)}/{len(star_centroids)} ({100*len(overlapping_stars)/len(star_centroids):.0f}%)\")\n", " print()\n", " else:\n", - " print(f\"✓ No aperture overlaps detected\\n\")\n", + " print(\"✓ No aperture overlaps detected\\n\")\n", " \n", " # Calculate alternative mzero methods - filter for valid stars (flux > 0 and mzero not None)\n", " valid_indices = [i for i in range(len(star_fluxes)) \n", @@ -1769,8 +1768,8 @@ " ax4.scatter(log_fluxes, valid_mzeros, s=80, alpha=0.7, c=colors, edgecolors='black', linewidths=1)\n", " \n", " # Horizontal lines\n", - " ax4.axhline(mzero_mean, color='blue', linestyle='-', linewidth=2, label=f'Mean', alpha=0.6)\n", - " ax4.axhline(mzero_median, color='green', linestyle='--', linewidth=2, label=f'Median', alpha=0.6)\n", + " ax4.axhline(mzero_mean, color='blue', linestyle='-', linewidth=2, label='Mean', alpha=0.6)\n", + " ax4.axhline(mzero_median, color='green', linestyle='--', linewidth=2, label='Median', alpha=0.6)\n", " \n", " # Check for trend with flux\n", " if len(valid_mzeros) >= 3:\n", @@ -1783,7 +1782,7 @@ " \n", " ax4.set_xlabel('log₁₀(Flux [ADU])', fontsize=10)\n", " ax4.set_ylabel('mzero', fontsize=10)\n", - " ax4.set_title(f'mzero vs Flux\\n(Should be flat if aperture correct)', fontsize=10, weight='bold')\n", + " ax4.set_title('mzero vs Flux\\n(Should be flat if aperture correct)', fontsize=10, weight='bold')\n", " ax4.legend(fontsize=8, loc='best')\n", " ax4.grid(True, alpha=0.3)\n", " else:\n", @@ -1798,12 +1797,12 @@ " color='steelblue', alpha=0.7, edgecolor='black')\n", " \n", " # Mark different estimators\n", - " ax5.axvline(mzero_mean, color='blue', linestyle='-', linewidth=2, label=f'Mean')\n", - " ax5.axvline(mzero_median, color='green', linestyle='--', linewidth=2, label=f'Median')\n", + " ax5.axvline(mzero_mean, color='blue', linestyle='-', linewidth=2, label='Mean')\n", + " ax5.axvline(mzero_median, color='green', linestyle='--', linewidth=2, label='Median')\n", " if n_clipped > 0:\n", - " ax5.axvline(mzero_sigclip, color='red', linestyle='-.', linewidth=2, label=f'σ-clip')\n", + " ax5.axvline(mzero_sigclip, color='red', linestyle='-.', linewidth=2, label='σ-clip')\n", " if len(valid_mzeros) >= 3:\n", - " ax5.axvline(mzero_trend, color='purple', linestyle=':', linewidth=2, label=f'Trend')\n", + " ax5.axvline(mzero_trend, color='purple', linestyle=':', linewidth=2, label='Trend')\n", " \n", " ax5.set_xlabel('mzero', fontsize=10)\n", " ax5.set_ylabel('Count', fontsize=10)\n", @@ -1995,7 +1994,7 @@ " \n", " # Print detailed overlap information\n", " if overlaps:\n", - " print(f\"\\nDETAILED OVERLAP INFORMATION:\")\n", + " print(\"\\nDETAILED OVERLAP INFORMATION:\")\n", " print(f\"{'='*100}\")\n", " for overlap in overlaps:\n", " i, j = overlap['star1_idx'], overlap['star2_idx']\n", @@ -2006,7 +2005,7 @@ " \n", " # Print summary\n", " print(f\"\\n✓ Processed {filename}\")\n", - " print(f\"\\n WITHOUT overlap correction:\")\n", + " print(\"\\n WITHOUT overlap correction:\")\n", " print(f\" SQM (mean): {sqm_val:.2f} (error: {sqm_val - info['realsqm']:+.2f})\")\n", " print(f\" SQM (median): {sqm_median:.2f} (error: {sqm_median - info['realsqm']:+.2f})\")\n", " print(f\" SQM (σ-clipped): {sqm_sigclip:.2f} (error: {sqm_sigclip - info['realsqm']:+.2f})\")\n", @@ -2014,7 +2013,7 @@ " print(f\" SQM (trend+σ-clip): {sqm_trend_sigclip:.2f} (error: {sqm_trend_sigclip - info['realsqm']:+.2f})\")\n", " \n", " if sqm_val_corrected is not None:\n", - " print(f\"\\n WITH overlap correction:\")\n", + " print(\"\\n WITH overlap correction:\")\n", " print(f\" SQM (overlap-corr): {sqm_val_corrected:.2f} (error: {sqm_val_corrected - info['realsqm']:+.2f})\")\n", " print(f\" Stars excluded: {details_corrected.get('n_stars_excluded_overlaps', 0)}/{details_corrected.get('n_matched_stars_original', 0)}\")\n", " print(f\" Improvement: {(sqm_val_corrected - sqm_val):+.2f} mag/arcsec²\")\n", @@ -2039,7 +2038,7 @@ " issues.append(f\"⚠️ {len(overlaps)} aperture overlaps detected ({len(overlapping_stars)} stars affected)\")\n", " \n", " if issues:\n", - " print(f\"\\n Notes:\")\n", + " print(\"\\n Notes:\")\n", " for issue in issues:\n", " print(f\" {issue}\")\n", " \n", diff --git a/python/PiFinder/sqm/sqm.py b/python/PiFinder/sqm/sqm.py index a1250ccbf..82246bd75 100644 --- a/python/PiFinder/sqm/sqm.py +++ b/python/PiFinder/sqm/sqm.py @@ -1,9 +1,6 @@ import numpy as np import logging -from typing import Tuple, Dict, Optional, Any -from datetime import datetime -import time -from PiFinder.state import SQM as SQMState +from typing import Tuple, Dict, Optional from .noise_floor import NoiseFloorEstimator logger = logging.getLogger("Solver") @@ -34,36 +31,22 @@ class SQM: def __init__( self, camera_type: str = "imx296", - pedestal_from_background: bool = False, - use_adaptive_noise_floor: bool = True, ): """ Initialize SQM calculator. Args: - camera_type: Camera model (imx296, imx462, imx290, hq) for noise estimation - pedestal_from_background: If True, automatically estimate pedestal from - median of local backgrounds. Default False (manual pedestal only). - use_adaptive_noise_floor: If True, use adaptive noise floor estimation. - If False, fall back to manual pedestal parameter. Default True. + camera_type: Camera model (imx296, imx462, imx290, hq) for noise estimation. + Use "_processed" suffix for 8-bit ISP-processed images. """ - super() - self.pedestal_from_background = pedestal_from_background - self.use_adaptive_noise_floor = use_adaptive_noise_floor - - # Initialize noise floor estimator if enabled - self.noise_estimator: Optional[NoiseFloorEstimator] = None - if use_adaptive_noise_floor: - self.noise_estimator = NoiseFloorEstimator( - camera_type=camera_type, - enable_zero_sec_sampling=True, - zero_sec_interval=300, # Every 5 minutes - ) - logger.info( - f"SQM initialized with adaptive noise floor estimation (camera: {camera_type})" - ) - else: - logger.info("SQM initialized with manual pedestal mode") + self.noise_estimator = NoiseFloorEstimator( + camera_type=camera_type, + enable_zero_sec_sampling=True, + zero_sec_interval=300, # Every 5 minutes + ) + logger.info( + f"SQM initialized with adaptive noise floor estimation (camera: {camera_type})" + ) def _calc_field_parameters(self, fov_degrees: float) -> None: """Calculate field of view parameters.""" @@ -72,43 +55,24 @@ def _calc_field_parameters(self, fov_degrees: float) -> None: self.pixels_total = 512**2 self.arcsec_squared_per_pixel = self.field_arcsec_squared / self.pixels_total - def _calculate_background( - self, image: np.ndarray, centroids: np.ndarray, exclusion_radius: int - ) -> float: + def _pickering_airmass(self, altitude_deg: float) -> float: """ - Calculate background from star-free regions using median. + Calculate airmass using Pickering (2002) formula. + + More accurate than simple 1/sin(alt) near the horizon. + Accounts for atmospheric refraction. + + Reference: Pickering, K.A. (2002), "The Southern Limits of the Ancient + Star Catalogs", DIO 12, 1-15. Args: - image: Image array - centroids: All detected centroids (for masking) - exclusion_radius: Radius around each star to exclude (pixels) + altitude_deg: Altitude in degrees (must be > 0) Returns: - Background level in ADU per pixel + Airmass value (1.0 at zenith, increases toward horizon) """ - height, width = image.shape - mask = np.ones((height, width), dtype=bool) - - # Create coordinate grids - y, x = np.ogrid[:height, :width] - - # Mask out regions around all stars - for cx, cy in centroids: - if 0 <= cx < width and 0 <= cy < height: - star_mask = (x - cx) ** 2 + (y - cy) ** 2 <= exclusion_radius**2 - mask &= ~star_mask - - # Calculate median background from unmasked regions - if np.sum(mask) > 100: # Need enough pixels for reliable median - background_per_pixel = np.median(image[mask]) - else: - # Fallback to percentile if too many stars - background_per_pixel = np.percentile(image, 10) - logger.warning( - f"Using 10th percentile for background (only {np.sum(mask)} unmasked pixels)" - ) - - return float(background_per_pixel) + h = altitude_deg + return 1.0 / np.sin(np.radians(h + 244.0 / (165.0 + 47.0 * h**1.1))) def _measure_star_flux_with_local_background( self, @@ -117,7 +81,8 @@ def _measure_star_flux_with_local_background( aperture_radius: int, annulus_inner_radius: int, annulus_outer_radius: int, - ) -> Tuple[list, list]: + saturation_threshold: int = 250, + ) -> Tuple[list, list, int]: """ Measure star flux with local background from annulus around each star. @@ -127,15 +92,20 @@ def _measure_star_flux_with_local_background( aperture_radius: Aperture radius for star flux in pixels annulus_inner_radius: Inner radius of background annulus in pixels annulus_outer_radius: Outer radius of background annulus in pixels + saturation_threshold: Pixel value threshold for saturation detection (default: 250) + Stars with any aperture pixel >= this value are marked saturated Returns: - Tuple of (star_fluxes, local_backgrounds) where: + Tuple of (star_fluxes, local_backgrounds, n_saturated) where: star_fluxes: Background-subtracted star fluxes (total ADU above local background) + Saturated stars have flux set to -1 to be excluded from mzero calculation local_backgrounds: Local background per pixel for each star (ADU/pixel) + n_saturated: Number of stars excluded due to saturation """ height, width = image.shape star_fluxes = [] local_backgrounds = [] + n_saturated = 0 # Pre-compute squared radii aperture_r2 = aperture_radius**2 @@ -177,8 +147,19 @@ def _measure_star_flux_with_local_background( f"Star at ({cx:.0f},{cy:.0f}) has no annulus pixels, using global median" ) + # Check for saturation in aperture + aperture_pixels = image_patch[aperture_mask] + max_aperture_pixel = np.max(aperture_pixels) if len(aperture_pixels) > 0 else 0 + + if max_aperture_pixel >= saturation_threshold: + # Mark saturated star with flux=-1 to be excluded from mzero calculation + star_fluxes.append(-1) + local_backgrounds.append(local_bg_per_pixel) + n_saturated += 1 + continue + # Total flux in aperture (includes background) - total_flux = np.sum(image_patch[aperture_mask]) + total_flux = np.sum(aperture_pixels) # Subtract background contribution aperture_area_pixels = np.sum(aperture_mask) @@ -188,28 +169,33 @@ def _measure_star_flux_with_local_background( star_fluxes.append(star_flux) local_backgrounds.append(local_bg_per_pixel) - return star_fluxes, local_backgrounds + return star_fluxes, local_backgrounds, n_saturated def _calculate_mzero( self, star_fluxes: list, star_mags: list ) -> Tuple[Optional[float], list]: """ - Calculate photometric zero point from calibrated stars. + Calculate photometric zero point from calibrated stars using flux-weighted mean. For point sources: mzero = catalog_mag + 2.5 × log10(total_flux_ADU) This zero point allows converting any ADU measurement to magnitudes: mag = mzero - 2.5 × log10(flux_ADU) + Uses flux-weighted mean: brighter stars have higher SNR so their + mzero estimates are more reliable. + Args: star_fluxes: Background-subtracted star fluxes (ADU) star_mags: Catalog magnitudes for matched stars Returns: - Tuple of (mean_mzero, list_of_individual_mzeros) + Tuple of (weighted_mean_mzero, list_of_individual_mzeros) Note: The mzeros list will contain None for stars with invalid flux """ mzeros: list[Optional[float]] = [] + valid_mzeros = [] + valid_fluxes = [] for flux, mag in zip(star_fluxes, star_mags): if flux <= 0: @@ -222,16 +208,21 @@ def _calculate_mzero( # Calculate zero point: ZP = m + 2.5*log10(F) mzero = mag + 2.5 * np.log10(flux) mzeros.append(mzero) - - # Filter out None values for statistics calculation - valid_mzeros = [mz for mz in mzeros if mz is not None] + valid_mzeros.append(mzero) + valid_fluxes.append(flux) if len(valid_mzeros) == 0: logger.error("No valid stars for mzero calculation") return None, mzeros - # Return mean and the full mzeros list (which may contain None values) - return float(np.mean(valid_mzeros)), mzeros + # Flux-weighted mean: brighter stars contribute more + valid_mzeros_arr = np.array(valid_mzeros) + valid_fluxes_arr = np.array(valid_fluxes) + weighted_mzero = float( + np.average(valid_mzeros_arr, weights=valid_fluxes_arr) + ) + + return weighted_mzero, mzeros def _detect_aperture_overlaps( self, @@ -286,22 +277,20 @@ def _detect_aperture_overlaps( def _atmospheric_extinction(self, altitude_deg: float) -> float: """ - Calculate atmospheric extinction correction to above-atmosphere equivalent. - - Uses simplified airmass model and typical V-band extinction coefficient. + Calculate atmospheric extinction correction. - The atmosphere ALWAYS dims starlight - even at zenith there's 0.28 mag extinction. - This correction accounts for the total atmospheric extinction to estimate what - the sky brightness would be if measured from above the atmosphere. + Uses Pickering (2002) airmass formula for improved accuracy near horizon. + Zenith is the reference point (extinction=0), with additional extinction + added for lower altitudes. Args: altitude_deg: Altitude of field center in degrees Returns: Extinction correction in magnitudes (add to measured SQM) - - At zenith (90°): 0.28 mag (minimum) - - At 45°: ~0.40 mag - - At 30°: 0.56 mag + - At zenith (90°): 0.0 mag (reference point) + - At 45°: ~0.12 mag + - At 30°: ~0.28 mag """ if altitude_deg <= 0: logger.warning( @@ -309,29 +298,32 @@ def _atmospheric_extinction(self, altitude_deg: float) -> float: ) return 0.0 - # Simplified airmass calculation - altitude_rad = np.radians(altitude_deg) - airmass = 1.0 / np.sin(altitude_rad) + # Use Pickering (2002) airmass formula for better accuracy near horizon + airmass = self._pickering_airmass(altitude_deg) - # Typical V-band extinction: 0.28 mag/airmass at sea level - # Total extinction is always present (minimum 0.28 mag at zenith) - extinction_correction = 0.28 * airmass + # V-band extinction coefficient: 0.28 mag/airmass + # Following ASTAP convention: zenith is reference point (extinction=0 at zenith) + # Only the ADDITIONAL extinction below zenith is added: k * (airmass - 1) + extinction_correction = 0.28 * (airmass - 1) return extinction_correction + def _determine_pedestal_source(self) -> str: + """Determine the source of the pedestal value for diagnostics.""" + return "adaptive_noise_floor" + def calculate( self, centroids: list, solution: dict, image: np.ndarray, exposure_sec: float, - bias_image: Optional[np.ndarray] = None, altitude_deg: float = 90.0, aperture_radius: int = 5, annulus_inner_radius: int = 6, annulus_outer_radius: int = 14, - pedestal: float = 0.0, correct_overlaps: bool = False, + saturation_threshold: int = 250, ) -> Tuple[Optional[float], Dict]: """ Calculate SQM (Sky Quality Meter) value using local background annuli. @@ -340,17 +332,13 @@ def calculate( centroids: All detected centroids (unused, kept for compatibility) solution: Tetra3 solution dict with 'FOV', 'matched_centroids', 'matched_stars' image: Image array (uint8 or float) - exposure_sec: Exposure time in seconds (required for adaptive noise floor) - bias_image: Optional bias/dark frame for pedestal calculation (default: None) + exposure_sec: Exposure time in seconds (required for noise floor estimation) altitude_deg: Altitude of field center for extinction correction (default: 90 = zenith) aperture_radius: Radius for star photometry in pixels (default: 5) annulus_inner_radius: Inner radius of background annulus in pixels (default: 6) annulus_outer_radius: Outer radius of background annulus in pixels (default: 14) - pedestal: Bias/pedestal level to subtract from background (default: 0) - Only used if use_adaptive_noise_floor=False - If bias_image is provided and pedestal=0, pedestal is calculated from bias_image correct_overlaps: If True, exclude stars with overlapping apertures/annuli (default: False) - Excludes CRITICAL and HIGH overlaps to prevent contamination + saturation_threshold: Pixel value threshold for saturation detection (default: 250) Returns: Tuple of (sqm_value, details_dict) where: @@ -358,17 +346,12 @@ def calculate( details_dict: Dictionary with intermediate values for diagnostics Example: - # Using local annulus backgrounds (handles uneven backgrounds) sqm_value, details = sqm_calculator.calculate( centroids=all_centroids, solution=tetra3_solution, image=np_image, - bias_image=bias_frame, + exposure_sec=0.5, altitude_deg=45.0, - aperture_radius=5, - annulus_inner_radius=6, - annulus_outer_radius=14, - correct_overlaps=True # Exclude overlapping stars ) if sqm_value: @@ -432,61 +415,52 @@ def calculate( ) return None, {} - # 0. Determine noise floor / pedestal - noise_floor_details: Dict[str, Any] = {} - - if self.use_adaptive_noise_floor and self.noise_estimator is not None: - # Use adaptive noise floor estimation - noise_floor, noise_floor_details = ( - self.noise_estimator.estimate_noise_floor( - image=image, - exposure_sec=exposure_sec, - percentile=5.0, - ) - ) - pedestal = noise_floor + # 0. Determine noise floor / pedestal using adaptive estimation + noise_floor, noise_floor_details = self.noise_estimator.estimate_noise_floor( + image=image, + exposure_sec=exposure_sec, + percentile=5.0, + ) + # Pedestal = bias_offset + dark_current_contribution + # - Bias offset: electronic pedestal, systematic offset - SUBTRACT + # - Dark current mean: thermal electrons, systematic offset - SUBTRACT + # - Read noise: random fluctuation around 0 - do NOT subtract + # For processed images, dark_current_contribution is ~0 (ISP handles it) + bias_offset = noise_floor_details.get("bias_offset", 0.0) + dark_current_contrib = noise_floor_details.get("dark_current_contribution", 0.0) + pedestal = bias_offset + dark_current_contrib + + logger.info( + f"Adaptive noise floor: {noise_floor:.1f} ADU, " + f"pedestal={pedestal:.1f} (bias={bias_offset:.1f} + dark={dark_current_contrib:.1f}) " + f"(dark_px={noise_floor_details['dark_pixel_smoothed']:.1f}, " + f"theory={noise_floor_details['theoretical_floor']:.1f}, " + f"valid={noise_floor_details['is_valid']})" + ) + # Check if zero-sec sample requested + if noise_floor_details.get("request_zero_sec_sample"): logger.info( - f"Adaptive noise floor: {noise_floor:.1f} ADU " - f"(dark_px={noise_floor_details['dark_pixel_smoothed']:.1f}, " - f"theory={noise_floor_details['theoretical_floor']:.1f}, " - f"valid={noise_floor_details['is_valid']})" + "Zero-second calibration sample requested by noise estimator " + "(will be captured in next cycle)" ) - # Check if zero-sec sample requested - if noise_floor_details.get("request_zero_sec_sample"): - logger.info( - "Zero-second calibration sample requested by noise estimator " - "(will be captured in next cycle)" - ) - else: - # Use manual pedestal (legacy mode) - if bias_image is not None and pedestal == 0.0: - pedestal = float(np.median(bias_image)) - logger.debug(f"Pedestal from bias: {pedestal:.2f} ADU") - elif pedestal > 0: - logger.debug(f"Using manual pedestal: {pedestal:.2f} ADU") - else: - logger.debug("No pedestal applied") - # 1. Measure star fluxes with local background from annulus - star_fluxes, local_backgrounds = self._measure_star_flux_with_local_background( - image, - matched_centroids_arr, - aperture_radius, - annulus_inner_radius, - annulus_outer_radius, + star_fluxes, local_backgrounds, n_saturated = ( + self._measure_star_flux_with_local_background( + image, + matched_centroids_arr, + aperture_radius, + annulus_inner_radius, + annulus_outer_radius, + saturation_threshold, + ) ) - # 1a. Estimate pedestal from median local background if enabled and not already set - if ( - self.pedestal_from_background - and pedestal == 0.0 - and len(local_backgrounds) > 0 - ): - pedestal = float(np.median(local_backgrounds)) - logger.debug( - f"Pedestal estimated from median(local_backgrounds): {pedestal:.2f} ADU" + if n_saturated > 0: + logger.info( + f"Excluded {n_saturated}/{len(matched_centroids_arr)} saturated stars " + f"(threshold={saturation_threshold})" ) # 2. Calculate sky background from median of local backgrounds @@ -516,16 +490,23 @@ def calculate( # 5. Convert background to flux density (ADU per arcsec²) background_flux_density = background_corrected / self.arcsec_squared_per_pixel - # 6. Calculate raw SQM + # 6. Calculate SQM (before extinction correction) if background_flux_density <= 0: logger.error(f"Invalid background flux density: {background_flux_density}") return None, {} - sqm_raw = mzero - 2.5 * np.log10(background_flux_density) + sqm_uncorrected = mzero - 2.5 * np.log10(background_flux_density) + + # 7. Apply atmospheric extinction correction (ASTAP convention) + # Following ASTAP: zenith is reference point where extinction = 0 + # Only ADDITIONAL extinction below zenith is added: 0.28 * (airmass - 1) + # This allows comparing measurements at different altitudes + extinction_for_altitude = self._atmospheric_extinction(altitude_deg) # 0.28*(airmass-1) - # 7. Apply atmospheric extinction correction - extinction_correction = self._atmospheric_extinction(altitude_deg) - sqm_final = sqm_raw + extinction_correction + # Main SQM value: no extinction correction (raw measurement) + sqm_final = sqm_uncorrected + # Altitude-corrected value: adds extinction for altitude comparison + sqm_altitude_corrected = sqm_uncorrected + extinction_for_altitude # Filter out None values for statistics in diagnostics valid_mzeros_for_stats = [mz for mz in mzeros if mz is not None] @@ -540,24 +521,12 @@ def calculate( "n_matched_stars_original": n_stars_original, "overlap_correction_enabled": correct_overlaps, "n_stars_excluded_overlaps": n_stars_excluded, + "n_stars_excluded_saturation": n_saturated, + "saturation_threshold": saturation_threshold, "background_per_pixel": background_per_pixel, "background_method": "local_annulus", "pedestal": pedestal, - "pedestal_source": ( - "adaptive_noise_floor" - if self.use_adaptive_noise_floor and self.noise_estimator is not None - else ( - "bias_image" - if bias_image is not None - else ( - "median_local_backgrounds" - if pedestal > 0 - and bias_image is None - and self.pedestal_from_background - else ("manual" if pedestal > 0 else "none") - ) - ) - ), + "pedestal_source": self._determine_pedestal_source(), "noise_floor_details": noise_floor_details if noise_floor_details else None, "exposure_sec": exposure_sec, "background_corrected": background_corrected, @@ -572,10 +541,11 @@ def calculate( float(np.min(valid_mzeros_for_stats)), float(np.max(valid_mzeros_for_stats)), ), - "sqm_raw": sqm_raw, + "sqm_uncorrected": sqm_uncorrected, "altitude_deg": altitude_deg, - "extinction_correction": extinction_correction, + "extinction_for_altitude": extinction_for_altitude, "sqm_final": sqm_final, + "sqm_altitude_corrected": sqm_altitude_corrected, # Per-star details for diagnostics "star_centroids": matched_centroids_arr.tolist(), "star_mags": star_mags, @@ -587,102 +557,8 @@ def calculate( logger.debug( f"SQM: mzero={mzero:.2f}±{np.std(valid_mzeros_for_stats):.2f}, " f"bg={background_flux_density:.6f} ADU/arcsec², pedestal={pedestal:.2f}, " - f"raw={sqm_raw:.2f}, extinction={extinction_correction:.2f}, final={sqm_final:.2f}" + f"raw={sqm_uncorrected:.2f}, ext_alt={extinction_for_altitude:.2f}, " + f"final={sqm_final:.2f}, alt_corr={sqm_altitude_corrected:.2f}" ) return sqm_final, details - - -def update_sqm_if_needed( - shared_state, - sqm_calculator: SQM, - centroids: list, - solution: dict, - image: np.ndarray, - exposure_sec: float, - altitude_deg: float, - calculation_interval_seconds: float = 5.0, - aperture_radius: int = 5, - annulus_inner_radius: int = 6, - annulus_outer_radius: int = 14, -) -> bool: - """ - Check if SQM needs updating and calculate/store new value if needed. - - This function encapsulates all the logic for time-based SQM updates: - - Checks if enough time has passed since last update - - Calculates new SQM value if needed - - Updates shared state with new SQM object - - Handles all timestamp conversions and error cases - - Args: - shared_state: SharedStateObj instance to read/write SQM state - sqm_calculator: SQM calculator instance - centroids: List of detected star centroids - solution: Tetra3 solve solution with matched stars - image: Raw image array - exposure_sec: Exposure time in seconds (required for adaptive noise floor) - altitude_deg: Altitude in degrees for extinction correction - calculation_interval_seconds: Minimum time between calculations (default: 5.0) - aperture_radius: Aperture radius for photometry (default: 5) - annulus_inner_radius: Inner annulus radius (default: 6) - annulus_outer_radius: Outer annulus radius (default: 14) - - Returns: - bool: True if SQM was calculated and updated, False otherwise - """ - # Get current SQM state from shared state - current_sqm = shared_state.sqm() - current_time = time.time() - - # Check if we should calculate SQM: - # - No previous calculation (last_update is None), OR - # - Enough time has passed since last update - should_calculate = current_sqm.last_update is None - - if current_sqm.last_update is not None: - try: - last_update_time = datetime.fromisoformat( - current_sqm.last_update - ).timestamp() - should_calculate = ( - current_time - last_update_time - ) >= calculation_interval_seconds - except (ValueError, AttributeError): - # If timestamp parsing fails, recalculate - logger.warning("Failed to parse SQM timestamp, recalculating") - should_calculate = True - - if not should_calculate: - return False - - # Calculate new SQM value - try: - sqm_value, _ = sqm_calculator.calculate( - centroids=centroids, - solution=solution, - image=image, - exposure_sec=exposure_sec, - altitude_deg=altitude_deg, - aperture_radius=aperture_radius, - annulus_inner_radius=annulus_inner_radius, - annulus_outer_radius=annulus_outer_radius, - ) - - if sqm_value is not None: - # Create new SQM state object - new_sqm_state = SQMState( - value=sqm_value, - source="Calculated", - last_update=datetime.now().isoformat(), - ) - shared_state.set_sqm(new_sqm_state) - logger.debug(f"SQM: {sqm_value:.2f} mag/arcsec²") - return True - else: - logger.warning("SQM calculation returned None") - return False - - except Exception as e: - logger.error(f"SQM calculation failed: {e}", exc_info=True) - return False diff --git a/python/PiFinder/state.py b/python/PiFinder/state.py index 03a979855..2fc6fd027 100644 --- a/python/PiFinder/state.py +++ b/python/PiFinder/state.py @@ -146,19 +146,13 @@ class SQM: Sky Quality Meter - represents the sky brightness measurement. """ - value: float = ( - 20.15 # mag/arcsec² - default typical dark sky value (processed 8-bit) - ) - value_raw: Optional[float] = ( - None # mag/arcsec² - from raw 16-bit pipeline (more accurate) - ) + value: float = 20.15 # mag/arcsec² - default typical dark sky value source: str = "None" # "None", "Calculated", "Manual", etc. last_update: Optional[str] = None # ISO timestamp of last update def __str__(self): - raw_str = f", raw={self.value_raw:.2f}" if self.value_raw is not None else "" return ( - f"SQM(value={self.value:.2f} mag/arcsec²{raw_str}, " + f"SQM(value={self.value:.2f} mag/arcsec², " f"source={self.source}, " f"last_update={self.last_update or 'Never'})" ) @@ -264,6 +258,8 @@ def __init__(self): self.__imu = None self.__location: Location = Location() self.__sqm: SQM = SQM() + self.__noise_floor: float = 10.0 # Adaptive noise floor in ADU (default fallback) + self.__sqm_details: dict = {} # Full SQM calculation details for calibration self.__datetime = None self.__datetime_time = None self.__screen = None @@ -363,6 +359,22 @@ def set_sqm(self, sqm: SQM): """Update the SQM value""" self.__sqm = sqm + def noise_floor(self) -> float: + """Return the adaptive noise floor in ADU""" + return self.__noise_floor + + def set_noise_floor(self, v: float): + """Update the adaptive noise floor (from SQM calculator)""" + self.__noise_floor = v + + def sqm_details(self) -> dict: + """Return the full SQM calculation details""" + return self.__sqm_details + + def set_sqm_details(self, v: dict): + """Update the SQM calculation details""" + self.__sqm_details = v + def get_sky_brightness(self): """Return just the numeric SQM value for convenience""" return self.__sqm.value diff --git a/python/PiFinder/ui/base.py b/python/PiFinder/ui/base.py index 53a0f0363..e09880ed8 100644 --- a/python/PiFinder/ui/base.py +++ b/python/PiFinder/ui/base.py @@ -25,6 +25,52 @@ def _(a) -> Any: return a +class RotatingInfoDisplay: + """Alternates between constellation and SQM with cross-fade animation.""" + + def __init__(self, shared_state, interval=3.0, fade_speed=0.15): + self.shared_state = shared_state + self.interval = interval + self.fade_speed = fade_speed + self.show_sqm = False + self.last_switch = time.time() + self.progress = 1.0 # 1.0 = stable, <1.0 = transitioning + + def _get_text(self, use_sqm): + if use_sqm: + sqm = self.shared_state.sqm() + return f"{sqm.value:.1f}" if sqm and sqm.value else "---" + else: + sol = self.shared_state.solution() + return sol.get("constellation", "---") if sol else "---" + + def update(self): + """Update state, returns (current_text, previous_text, progress).""" + now = time.time() + if now - self.last_switch >= self.interval: + self.show_sqm = not self.show_sqm + self.last_switch = now + self.progress = 0.0 + if self.progress < 1.0: + self.progress = min(1.0, self.progress + self.fade_speed) + return self._get_text(self.show_sqm), self._get_text(not self.show_sqm), self.progress + + def draw(self, draw, x, y, font, colors, max_brightness=255, inverted=False): + """Draw with cross-fade animation. inverted=True for dark text on light bg.""" + current, previous, progress = self.update() + if progress < 1.0: + fade_out = progress < 0.5 + t = progress * 2 if fade_out else (progress - 0.5) * 2 + if inverted: + brightness = int(max_brightness * t) if fade_out else int(max_brightness * (1 - t)) + else: + brightness = int(max_brightness * (1 - t)) if fade_out else int(max_brightness * t) + text = previous if fade_out else current + draw.text((x, y), text, font=font, fill=colors.get(brightness)) + else: + draw.text((x, y), current, font=font, fill=colors.get(0 if inverted else max_brightness)) + + class UIModule: __title__ = "BASE" __help_name__ = "" @@ -97,6 +143,9 @@ def __init__( # anim timer stuff self.last_update_time = time.time() + # Rotating info: alternates between constellation and SQM value + self._rotating_display = RotatingInfoDisplay(self.shared_state) + def active(self): """ Called when a module becomes active @@ -196,6 +245,18 @@ def message(self, message, timeout: float = 2, size=(5, 44, 123, 84)): self.ui_state.set_message_timeout(timeout + time.time()) + def _draw_titlebar_rotating_info(self, x, y, fg): + """Draw rotating constellation/SQM in title bar (dark text on gray bg).""" + self._rotating_display.draw( + self.draw, x, y, self.fonts.bold.font, self.colors, max_brightness=64, inverted=True + ) + + def draw_rotating_info(self, x=10, y=92, font=None): + """Draw rotating constellation/SQM display with cross-fade.""" + self._rotating_display.draw( + self.draw, x, y, font or self.fonts.bold.font, self.colors, max_brightness=255 + ) + def screen_update(self, title_bar=True, button_hints=True) -> None: """ called to trigger UI updates @@ -267,13 +328,11 @@ def screen_update(self, title_bar=True, button_hints=True) -> None: ) if len(self.title) < 9: - # draw the constellation - constellation = solution["constellation"] - self.draw.text( - (self.display_class.resX * 0.54, 1), - constellation, - font=self.fonts.bold.font, - fill=fg if self._unmoved else self.colors.get(32), + # Draw rotating constellation/SQM wheel (replaces static constellation) + self._draw_titlebar_rotating_info( + x=int(self.display_class.resX * 0.54), + y=1, + fg=fg if self._unmoved else self.colors.get(32), ) else: # no solve yet.... diff --git a/python/PiFinder/ui/exp_sweep.py b/python/PiFinder/ui/exp_sweep.py index 77115bda9..00ef7398a 100644 --- a/python/PiFinder/ui/exp_sweep.py +++ b/python/PiFinder/ui/exp_sweep.py @@ -45,8 +45,8 @@ def __init__(self, *args, **kwargs): self.start_time = None self.sweep_dir = None # Track the actual sweep directory self.initial_file_count = None # Files that existed before we started - self.total_images = 100 # Expected number of images - self.estimated_duration = 240 # 4 minutes estimated + self.total_images = 20 # Expected number of images + self.estimated_duration = 60 # ~1 minute estimated def active(self): """Called when module becomes active""" @@ -153,13 +153,13 @@ def _draw_confirm(self): self.draw.text( (10, 65), - "100 images", + "20 images", font=self.fonts.base.font, fill=self.colors.get(192), ) self.draw.text( (10, 77), - "~4 minutes", + "~1 minute", font=self.fonts.base.font, fill=self.colors.get(192), ) diff --git a/python/PiFinder/ui/sqm.py b/python/PiFinder/ui/sqm.py index 2cd0115f4..b6e15aab2 100644 --- a/python/PiFinder/ui/sqm.py +++ b/python/PiFinder/ui/sqm.py @@ -5,6 +5,7 @@ from PiFinder.ui.ui_utils import TextLayouter from PiFinder.image_util import gamma_correct_med, subtract_background import time +from pathlib import Path from typing import Any, TYPE_CHECKING from PIL import Image, ImageDraw, ImageChops, ImageOps @@ -156,26 +157,38 @@ def update(self, force=False): fill=self.colors.get(64), ) - self.draw.text( - (10, 30), - f"{sqm:.2f}", - font=self.fonts.huge.font, - fill=self.colors.get(192), - ) - # Raw SQM value (if available) in smaller text next to main value - if sqm_state.value_raw is not None: + # Show star count and exposure time (right side) + sqm_details = self.shared_state.sqm_details() + if sqm_details: + n_stars = sqm_details.get("n_matched_stars", 0) self.draw.text( - (95, 50), - f"{sqm_state.value_raw:.2f}", + (60, 20), + f"{n_stars}★", font=self.fonts.base.font, - fill=self.colors.get(128), + fill=self.colors.get(64), ) + + image_metadata = self.shared_state.last_image_metadata() + if image_metadata and "exposure_time" in image_metadata: + exp_ms = image_metadata["exposure_time"] / 1000 # Convert µs to ms + if exp_ms >= 1000: + exp_str = f"{exp_ms/1000:.2f}s" + else: + exp_str = f"{exp_ms:.0f}ms" self.draw.text( - (95, 62), - "raw", - font=self.fonts.small.font, + (95, 20), + exp_str, + font=self.fonts.base.font, fill=self.colors.get(64), ) + + self.draw.text( + (10, 30), + f"{sqm:.2f}", + font=self.fonts.huge.font, + fill=self.colors.get(192), + ) + # Units in small, subtle text self.draw.text( (12, 68), @@ -183,18 +196,42 @@ def update(self, force=False): font=self.fonts.base.font, fill=self.colors.get(64), ) - self.draw.text( - (10, 82), - f"{details['title']}", - font=self.fonts.base.font, - fill=self.colors.get(128), - ) - self.draw.text( - (10, 92), - _("Bortle {bc}").format(bc=details["bortle_class"]), - font=self.fonts.bold.font, - fill=self.colors.get(128), - ) + + # Calibration indicator (right side of units line) + if self._is_calibrated(): + self.draw.text( + (105, 68), + "CAL", + font=self.fonts.base.font, + fill=self.colors.get(128), + ) + else: + self.draw.text( + (98, 68), + "!CAL", + font=self.fonts.base.font, + fill=self.colors.get(64), + ) + + # Show altitude-corrected SQM (scientific value) if available + if sqm_details: + sqm_alt = sqm_details.get("sqm_altitude_corrected") + if sqm_alt: + self.draw.text( + (12, 80), + f"alt: {sqm_alt:.2f}", + font=self.fonts.base.font, + fill=self.colors.get(64), + ) + + # Bortle class + if details: + self.draw.text( + (10, 92), + _("Bortle {bc}").format(bc=details["bortle_class"]), + font=self.fonts.base.font, + fill=self.colors.get(128), + ) # Legend details_text = _("DETAILS") @@ -223,11 +260,30 @@ def key_minus(self): if self.show_description: self.text_layout.previous() + def _is_calibrated(self) -> bool: + """Check if SQM calibration file exists for current camera.""" + camera_type = self.shared_state.camera_type() + camera_type_processed = f"{camera_type}_processed" + calibration_file = ( + Path.home() / "PiFinder_data" / f"sqm_calibration_{camera_type_processed}.json" + ) + return calibration_file.exists() + def active(self): """ Called when a module becomes active i.e. foreground controlling display """ + # Switch to SNR auto-exposure mode for stable longer exposures + self.command_queues["camera"].put("set_ae_mode:snr") + + def inactive(self): + """ + Called when a module becomes inactive + i.e. leaving the SQM screen + """ + # Switch back to PID auto-exposure mode + self.command_queues["camera"].put("set_ae_mode:pid") def _launch_calibration(self, marking_menu, selected_item): """Launch the SQM calibration wizard""" diff --git a/python/PiFinder/ui/sqm_calibration.py b/python/PiFinder/ui/sqm_calibration.py index 5966cc311..fb3eb564f 100644 --- a/python/PiFinder/ui/sqm_calibration.py +++ b/python/PiFinder/ui/sqm_calibration.py @@ -692,8 +692,11 @@ def _analyze_calibration(self): bias_stack = np.array(self.bias_frames, dtype=np.float32) self.bias_offset = float(np.median(bias_stack)) - # 2. Compute read noise (std of all pixels in all bias frames) - self.read_noise = float(np.std(bias_stack)) + # 2. Compute read noise using temporal variance (not spatial) + # Spatial std includes fixed pattern noise (PRNU), which is wrong. + # Temporal variance at each pixel measures true read noise. + temporal_variance = np.var(bias_stack, axis=0) # variance across frames per pixel + self.read_noise = float(np.sqrt(np.mean(temporal_variance))) # 3. Compute dark current rate dark_stack = np.array(self.dark_frames, dtype=np.float32) @@ -710,8 +713,9 @@ def _analyze_calibration(self): bias_stack_raw = np.array(self.bias_frames_raw, dtype=np.float32) self.bias_offset_raw = float(np.median(bias_stack_raw)) - # 2. Compute read noise from raw frames - self.read_noise_raw = float(np.std(bias_stack_raw)) + # 2. Compute read noise using temporal variance (not spatial) + temporal_variance_raw = np.var(bias_stack_raw, axis=0) + self.read_noise_raw = float(np.sqrt(np.mean(temporal_variance_raw))) # 3. Compute dark current rate from raw frames dark_stack_raw = np.array(self.dark_frames_raw, dtype=np.float32) @@ -764,9 +768,7 @@ def _calculate_sky_sqm(self, exposure_sec: float): # Create SQM calculator with the newly measured calibration # Use PROCESSED (8-bit) pipeline camera_type_processed = f"{self.shared_state.camera_type()}_processed" - sqm_calc = SQM( - camera_type=camera_type_processed, use_adaptive_noise_floor=True - ) + sqm_calc = SQM(camera_type=camera_type_processed) # Manually set the calibration values we just measured if ( diff --git a/python/PiFinder/ui/sqm_correction.py b/python/PiFinder/ui/sqm_correction.py index aa27f436a..5c29caaa2 100644 --- a/python/PiFinder/ui/sqm_correction.py +++ b/python/PiFinder/ui/sqm_correction.py @@ -5,6 +5,7 @@ import os import json +import time import zipfile import logging from datetime import datetime @@ -137,7 +138,7 @@ def update(self, force=False): (0, message_y), self.error_message, font=self.fonts.base.font, - fill=self.colors.get(255, red=True), + fill=self.colors.get(255), ) else: self.error_message = None @@ -205,6 +206,11 @@ def key_right(self): self.message_time = datetime.now() return + # Show saving message and force screen update + self.success_message = _("Saving...") + self.message_time = datetime.now() + self.update(force=True) + # Create correction package try: zip_path = self._create_correction_package(corrected_sqm) @@ -218,14 +224,62 @@ def key_right(self): self.error_message = _("Save failed") self.message_time = datetime.now() + def _wait_for_exposure(self, target_exp_us: int, timeout: float = 5.0) -> bool: + """Wait for camera to capture image at target exposure.""" + start = time.time() + while time.time() - start < timeout: + metadata = self.shared_state.last_image_metadata() + if metadata and abs(metadata.get("exposure_time", 0) - target_exp_us) < 1000: + # Wait one more frame for image to be ready + time.sleep(0.3) + return True + time.sleep(0.1) + return False + + def _capture_at_exposure( + self, exp_us: int, corrections_dir: Path, timestamp: str, label: str + ) -> Dict[str, Any]: + """Capture image at specific exposure and return file info.""" + # Set exposure + self.command_queues["camera"].put(f"set_exp:{exp_us}") + + # Wait for image at this exposure + if not self._wait_for_exposure(exp_us): + logger.warning(f"Timeout waiting for exposure {exp_us}µs") + return {} + + # Capture images + camera_image = self.camera_image.copy() + raw_image = self.shared_state.cam_raw() + + files: Dict[str, Any] = {} + + # Save processed PNG + processed_path = f"correction_{timestamp}_{label}_processed.png" + camera_image.save(str(corrections_dir / processed_path)) + files["processed"] = processed_path + + # Save raw TIFF if available + if raw_image is not None: + raw_path = f"correction_{timestamp}_{label}_raw.tiff" + raw_image_pil = Image.fromarray(np.asarray(raw_image, dtype=np.uint16)) + raw_image_pil.save(str(corrections_dir / raw_path)) + files["raw"] = raw_path + + files["exposure_us"] = exp_us + return files + def _create_correction_package(self, corrected_sqm: float) -> str: """ - Create a zip file containing correction data. + Create a zip file containing correction data with bracketed exposures. + + Captures 3 exposures: current, +1 stop (2x), -1 stop (0.5x) + If +1 stop exceeds max (1s), captures -2 stops instead. Returns: Path to created zip file """ - # Create corrections directory (consistent with captures/ and solver_debug_dumps/) + # Create corrections directory corrections_dir = Path(utils.data_dir) / "captures" / "sqm_corrections" corrections_dir.mkdir(parents=True, exist_ok=True) @@ -234,37 +288,74 @@ def _create_correction_package(self, corrected_sqm: float) -> str: zip_filename = f"sqm_correction_{timestamp}.zip" zip_path = corrections_dir / zip_filename - # Capture current camera image - camera_image = self.camera_image.copy() - - # Get raw image from shared state - raw_image = self.shared_state.cam_raw() - - # Collect metadata + # Get current exposure + image_metadata = self.shared_state.last_image_metadata() + current_exp = image_metadata.get("exposure_time", 500000) if image_metadata else 500000 + + # Calculate bracket exposures (in microseconds) + max_exp = 1000000 # 1 second max + min_exp = 10000 # 10ms min + + exp_above = min(current_exp * 2, max_exp) + exp_below = max(current_exp // 2, min_exp) + exp_below2 = max(current_exp // 4, min_exp) + + # Determine which 3 exposures to capture + if exp_above <= current_exp: + # Can't go above, use current and two below + exposures = [ + (current_exp, "base"), + (exp_below, "minus1"), + (exp_below2, "minus2"), + ] + else: + # Normal bracket: below, current, above + exposures = [ + (exp_below, "minus1"), + (current_exp, "base"), + (exp_above, "plus1"), + ] + + # Collect metadata first (before changing exposure) metadata = self._collect_metadata(corrected_sqm) + metadata["bracket_exposures"] = [] + + # Capture all bracketed exposures + all_files = [] + for exp_us, label in exposures: + # Update saving message + self.success_message = _("Saving {label}...").format(label=label) + self.update(force=True) + + files = self._capture_at_exposure(exp_us, corrections_dir, timestamp, label) + if files: + all_files.append(files) + metadata["bracket_exposures"].append({ + "label": label, + "exposure_us": exp_us, + "exposure_sec": exp_us / 1_000_000.0, + }) + + # Re-enable auto-exposure + self.command_queues["camera"].put("set_exp:auto") # Create zip file with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: - # Add processed 8-bit PNG - processed_path = f"correction_{timestamp}_processed.png" - camera_image.save(str(corrections_dir / processed_path)) - zf.write(corrections_dir / processed_path, arcname=processed_path) - (corrections_dir / processed_path).unlink() # Clean up temp file - - # Add raw 16-bit TIFF if available - if raw_image is not None: - raw_path = f"correction_{timestamp}_raw.tiff" - raw_image_pil = Image.fromarray(np.asarray(raw_image, dtype=np.uint16)) - raw_image_pil.save(str(corrections_dir / raw_path)) - zf.write(corrections_dir / raw_path, arcname=raw_path) - (corrections_dir / raw_path).unlink() # Clean up temp file + # Add all captured images + for files in all_files: + if "processed" in files: + zf.write(corrections_dir / files["processed"], arcname=files["processed"]) + (corrections_dir / files["processed"]).unlink() + if "raw" in files: + zf.write(corrections_dir / files["raw"], arcname=files["raw"]) + (corrections_dir / files["raw"]).unlink() # Add metadata JSON metadata_path = f"correction_{timestamp}_metadata.json" with open(corrections_dir / metadata_path, "w") as f: json.dump(metadata, f, indent=2, default=str) zf.write(corrections_dir / metadata_path, arcname=metadata_path) - (corrections_dir / metadata_path).unlink() # Clean up temp file + (corrections_dir / metadata_path).unlink() return str(zip_path) @@ -279,10 +370,10 @@ def _collect_metadata(self, corrected_sqm: float) -> Dict[str, Any]: }, } - # Get SQM state for raw value + # Get SQM source sqm_state = self.shared_state.sqm() - if sqm_state.value_raw is not None: - metadata["sqm"]["original_raw"] = sqm_state.value_raw + if sqm_state.source: + metadata["sqm"]["source"] = sqm_state.source # Get GPS location location = self.shared_state.location() @@ -319,11 +410,20 @@ def _collect_metadata(self, corrected_sqm: float) -> Dict[str, Any]: "exposure_us": image_metadata.get("exposure_time"), "exposure_sec": image_metadata.get("exposure_time", 0) / 1_000_000.0, "gain": image_metadata.get("gain"), + "imu_delta": image_metadata.get("imu_delta"), } # Get camera type metadata["camera_type"] = self.shared_state.camera_type() + # Get adaptive noise floor (used for auto-exposure and SQM pedestal) + metadata["noise_floor_adu"] = self.shared_state.noise_floor() + + # Get full SQM calculation details (mzero, background, extinction, etc.) + sqm_details = self.shared_state.sqm_details() + if sqm_details: + metadata["sqm_calculation"] = sqm_details + return metadata def active(self): diff --git a/python/tests/test_auto_exposure.py b/python/tests/test_auto_exposure.py index 5f4c24a57..bd9e539e2 100644 --- a/python/tests/test_auto_exposure.py +++ b/python/tests/test_auto_exposure.py @@ -77,14 +77,15 @@ def test_initialization(self): handler = SweepZeroStarHandler() assert not handler.is_active() assert handler._trigger_count == 2 + # Sweep starts at 400ms, goes up, then tries shorter exposures assert handler._exposures == [ - 25000, - 50000, - 100000, - 200000, 400000, 800000, 1000000, + 200000, + 100000, + 50000, + 25000, ] assert handler._repeats_per_exposure == 2 @@ -104,78 +105,74 @@ def test_trigger_delay(self): assert result is None assert not handler.is_active() - # Second zero - should activate and return first exposure + # Second zero - should activate and return first exposure (400ms) result = handler.handle(50000, 2) - assert result == 25000 + assert result == 400000 assert handler.is_active() - def test_sweep_pattern_short_exposures(self): - """Sweep repeats each short exposure 2 times.""" + def test_sweep_pattern_start_at_400ms(self): + """Sweep starts at 400ms and repeats each exposure 2 times.""" handler = SweepZeroStarHandler(trigger_count=1) - # Activate sweep + # Activate sweep - starts at 400ms result = handler.handle(50000, 1) - assert result == 25000 # First attempt at 25ms + assert result == 400000 # First attempt at 400ms - # Second attempt at 25ms + # Second attempt at 400ms result = handler.handle(50000, 2) - assert result == 25000 + assert result == 400000 - # Move to 50ms + # Move to 800ms result = handler.handle(50000, 3) - assert result == 50000 + assert result == 800000 - # Second attempt at 50ms + # Second attempt at 800ms result = handler.handle(50000, 4) - assert result == 50000 + assert result == 800000 - # Move to 100ms + # Move to 1000ms result = handler.handle(50000, 5) - assert result == 100000 + assert result == 1000000 - def test_400ms_normal_behavior(self): - """Sweep treats 400ms like other exposures (2 attempts).""" + def test_sweep_continues_to_shorter_exposures(self): + """After longer exposures, sweep tries shorter ones.""" handler = SweepZeroStarHandler(trigger_count=1) - # Skip to 400ms by advancing through earlier exposures - # 25ms: 2 times - handler.handle(50000, 1) # 25ms attempt 1 - handler.handle(50000, 2) # 25ms attempt 2 - # 50ms: 2 times - handler.handle(50000, 3) # 50ms attempt 1 - handler.handle(50000, 4) # 50ms attempt 2 - # 100ms: 2 times - handler.handle(50000, 5) # 100ms attempt 1 - handler.handle(50000, 6) # 100ms attempt 2 - # 200ms: 2 times - handler.handle(50000, 7) # 200ms attempt 1 - handler.handle(50000, 8) # 200ms attempt 2 - - # Now at 400ms - should repeat 2 times like other exposures - result = handler.handle(50000, 9) - assert result == 400000 # Attempt 1 + # 400ms: 2 times (first in sweep) + handler.handle(50000, 1) # 400ms attempt 1 + handler.handle(50000, 2) # 400ms attempt 2 + # 800ms: 2 times + handler.handle(50000, 3) # 800ms attempt 1 + handler.handle(50000, 4) # 800ms attempt 2 + # 1000ms: 2 times + handler.handle(50000, 5) # 1000ms attempt 1 + handler.handle(50000, 6) # 1000ms attempt 2 + + # Now at 200ms - sweep continues with shorter exposures + result = handler.handle(50000, 7) + assert result == 200000 # Attempt 1 - result = handler.handle(50000, 10) - assert result == 400000 # Attempt 2 + result = handler.handle(50000, 8) + assert result == 200000 # Attempt 2 - # After 2 cycles, move to 800ms - result = handler.handle(50000, 11) - assert result == 800000 + # Move to 100ms + result = handler.handle(50000, 9) + assert result == 100000 def test_sweep_wraps_around(self): - """Sweep wraps back to minimum after reaching maximum.""" + """Sweep wraps back to start (400ms) after completing all exposures.""" handler = SweepZeroStarHandler(trigger_count=1) # Fast-forward through entire sweep - # 25ms (2×), 50ms (2×), 100ms (2×), 200ms (2×), 400ms (2×), 800ms (2×), 1000ms (2×) + # 400ms (2×), 800ms (2×), 1000ms (2×), 200ms (2×), 100ms (2×), 50ms (2×), 25ms (2×) total_cycles = 2 + 2 + 2 + 2 + 2 + 2 + 2 # = 14 for i in range(1, total_cycles + 1): handler.handle(50000, i) - # Next cycle should wrap to 25ms + # Next cycle should wrap to 400ms (start of sweep) result = handler.handle(50000, total_cycles + 1) - assert result == 25000 + assert result == 400000 def test_reset(self): """Reset clears handler state.""" @@ -509,18 +506,18 @@ def test_full_zero_star_recovery_cycle(self): result = pid.update(0, current_exposure) assert result is None # Trigger count = 2 - # Zero stars (second time) - sweep activates + # Zero stars (second time) - sweep activates at 400ms result = pid.update(0, current_exposure) - assert result == 25000 # Sweep starts at 25ms + assert result == 400000 # Sweep starts at 400ms assert pid._zero_star_handler.is_active() # Continue sweep result = pid.update(0, current_exposure) - assert result == 25000 # Second attempt at 25ms + assert result == 400000 # Second attempt at 400ms # Still zero stars, sweep continues result = pid.update(0, current_exposure) - assert result == 50000 # Move to 50ms + assert result == 800000 # Move to 800ms # Find stars again - return to PID result = pid.update(15, 50000) diff --git a/python/tests/test_sqm.py b/python/tests/test_sqm.py index c689ca0cd..a66990f74 100644 --- a/python/tests/test_sqm.py +++ b/python/tests/test_sqm.py @@ -11,23 +11,30 @@ class TestSQMExtinction: """ def test_extinction_at_zenith(self): - """Test that extinction at zenith is exactly 0.28 mag (1.0 airmass)""" + """Test that extinction at zenith is 0.0 mag (ASTAP convention: zenith is reference)""" sqm = SQM() extinction = sqm._atmospheric_extinction(90.0) - assert extinction == pytest.approx(0.28, abs=0.0001) + assert extinction == pytest.approx(0.0, abs=0.0001) def test_extinction_at_45_degrees(self): - """Test extinction at 45° altitude (airmass ≈ 1.414)""" + """Test extinction at 45° altitude using Pickering airmass""" sqm = SQM() extinction = sqm._atmospheric_extinction(45.0) - expected = 0.28 * 1.414213562373095 # 0.28 * sqrt(2) + # Pickering (2002) airmass at 45° ≈ 1.4124 + # ASTAP convention: 0.28 * (airmass - 1) + pickering_airmass = sqm._pickering_airmass(45.0) + expected = 0.28 * (pickering_airmass - 1) assert extinction == pytest.approx(expected, abs=0.001) def test_extinction_at_30_degrees(self): - """Test extinction at 30° altitude (airmass = 2.0)""" + """Test extinction at 30° altitude using Pickering airmass""" sqm = SQM() extinction = sqm._atmospheric_extinction(30.0) - assert extinction == pytest.approx(0.56, abs=0.001) # 0.28 * 2.0 + # Pickering (2002) airmass at 30° ≈ 1.995 + # ASTAP convention: 0.28 * (airmass - 1) ≈ 0.279 + pickering_airmass = sqm._pickering_airmass(30.0) + expected = 0.28 * (pickering_airmass - 1) + assert extinction == pytest.approx(expected, abs=0.001) def test_extinction_increases_toward_horizon(self): """Test that extinction increases as altitude decreases""" @@ -42,10 +49,13 @@ def test_extinction_increases_toward_horizon(self): ), f"Extinction at {altitudes[i]}° should be less than at {altitudes[i+1]}°" def test_extinction_minimum_is_at_zenith(self): - """Test that zenith (90°) has the minimum possible extinction""" + """Test that zenith (90°) has zero extinction (ASTAP convention)""" sqm = SQM() zenith_extinction = sqm._atmospheric_extinction(90.0) + # Zenith should have exactly zero extinction + assert zenith_extinction == pytest.approx(0.0, abs=0.0001) + # Test various altitudes - all should have more extinction than zenith test_altitudes = [89, 80, 70, 60, 50, 40, 30, 20, 10] for alt in test_altitudes: @@ -63,27 +73,76 @@ def test_extinction_invalid_altitude(self): assert sqm._atmospheric_extinction(-90.0) == 0.0 def test_extinction_airmass_relationship(self): - """Test the airmass formula: airmass = 1 / sin(altitude)""" + """Test the Pickering airmass formula: extinction = 0.28 * (airmass - 1)""" sqm = SQM() - # At 90°: airmass should be 1.0 + # At 90°: airmass ≈ 1.0, extinction ≈ 0 altitude = 90.0 - airmass = 1.0 / np.sin(np.radians(altitude)) + airmass = sqm._pickering_airmass(altitude) extinction = sqm._atmospheric_extinction(altitude) - assert extinction == pytest.approx(0.28 * airmass, abs=0.0001) + assert extinction == pytest.approx(0.28 * (airmass - 1), abs=0.0001) + assert extinction == pytest.approx(0.0, abs=0.0001) - # At 30°: airmass should be 2.0 + # At 30°: Pickering airmass ≈ 1.995 altitude = 30.0 - airmass = 1.0 / np.sin(np.radians(altitude)) + airmass = sqm._pickering_airmass(altitude) extinction = sqm._atmospheric_extinction(altitude) - assert extinction == pytest.approx(0.28 * airmass, abs=0.0001) - assert airmass == pytest.approx(2.0, abs=0.001) + assert extinction == pytest.approx(0.28 * (airmass - 1), abs=0.0001) + assert airmass == pytest.approx(1.995, abs=0.01) - # At 6°: airmass ≈ 9.6 (very close to horizon) + # At 6°: Pickering airmass is more accurate near horizon than simple formula altitude = 6.0 - airmass = 1.0 / np.sin(np.radians(altitude)) + airmass = sqm._pickering_airmass(altitude) extinction = sqm._atmospheric_extinction(altitude) - assert extinction == pytest.approx(0.28 * airmass, abs=0.001) + assert extinction == pytest.approx(0.28 * (airmass - 1), abs=0.001) + + +@pytest.mark.unit +class TestPickeringAirmass: + """ + Unit tests for Pickering (2002) airmass formula. + """ + + def test_airmass_at_zenith(self): + """Test airmass at zenith is 1.0""" + sqm = SQM() + airmass = sqm._pickering_airmass(90.0) + assert airmass == pytest.approx(1.0, abs=0.001) + + def test_airmass_at_45_degrees(self): + """Test airmass at 45° altitude""" + sqm = SQM() + airmass = sqm._pickering_airmass(45.0) + # Pickering airmass at 45° ≈ 1.413 (slightly less than simple 1/sin(45°) = 1.414) + assert airmass == pytest.approx(1.413, abs=0.01) + + def test_airmass_at_30_degrees(self): + """Test airmass at 30° altitude""" + sqm = SQM() + airmass = sqm._pickering_airmass(30.0) + # Pickering airmass at 30° ≈ 1.995 (slightly less than simple 1/sin(30°) = 2.0) + assert airmass == pytest.approx(1.995, abs=0.01) + + def test_airmass_at_10_degrees(self): + """Test airmass at 10° altitude shows Pickering correction""" + sqm = SQM() + pickering = sqm._pickering_airmass(10.0) + simple = 1.0 / np.sin(np.radians(10.0)) + # Pickering gives ~5.60, simple gives ~5.76 (3% difference) + assert pickering == pytest.approx(5.60, abs=0.05) + assert simple > pickering # Simple overestimates at low altitudes + + def test_airmass_increases_toward_horizon(self): + """Test airmass increases monotonically toward horizon""" + sqm = SQM() + altitudes = [90, 60, 45, 30, 20, 10, 5] + airmasses = [sqm._pickering_airmass(alt) for alt in altitudes] + + for i in range(len(airmasses) - 1): + assert airmasses[i] < airmasses[i + 1], ( + f"Airmass at {altitudes[i]}° ({airmasses[i]:.3f}) should be less than " + f"at {altitudes[i+1]}° ({airmasses[i+1]:.3f})" + ) @pytest.mark.unit @@ -133,7 +192,7 @@ def test_calculate_returns_tuple(self): assert isinstance(result[1], dict) def test_calculate_extinction_applied(self): - """Test that extinction correction is applied to final SQM value""" + """Test that extinction correction follows ASTAP convention""" sqm = SQM() solution = { @@ -153,39 +212,49 @@ def test_calculate_extinction_applied(self): image[y - 2 : y + 3, x - 2 : x + 3] += 5000 # Calculate at zenith - sqm_zenith, details_zenith = sqm.calculate( + # Note: Use high saturation threshold for uint16 test images + _sqm_zenith, details_zenith = sqm.calculate( centroids=centroids, solution=solution, image=image, exposure_sec=0.5, altitude_deg=90.0, + saturation_threshold=65000, ) # Calculate at 30° (2× airmass) - sqm_30deg, details_30deg = sqm.calculate( + _sqm_30deg, details_30deg = sqm.calculate( centroids=centroids, solution=solution, image=image, exposure_sec=0.5, altitude_deg=30.0, + saturation_threshold=65000, + ) + + # Check extinction values (ASTAP convention: 0 at zenith) + # Pickering airmass at 30° ≈ 1.995, so extinction ≈ 0.28 * 0.995 ≈ 0.279 + assert details_zenith["extinction_for_altitude"] == pytest.approx(0.0, abs=0.001) + expected_ext_30 = 0.28 * (sqm._pickering_airmass(30.0) - 1) + assert details_30deg["extinction_for_altitude"] == pytest.approx( + expected_ext_30, abs=0.001 ) - # Check extinction values - assert details_zenith["extinction_correction"] == pytest.approx(0.28, abs=0.001) - assert details_30deg["extinction_correction"] == pytest.approx(0.56, abs=0.001) + # Uncorrected SQM should be the same (same image, same stars) + assert details_zenith["sqm_uncorrected"] == pytest.approx( + details_30deg["sqm_uncorrected"], abs=0.001 + ) - # Raw SQM should be the same (same image, same stars) - assert details_zenith["sqm_raw"] == pytest.approx( - details_30deg["sqm_raw"], abs=0.001 + # sqm_final is raw (no extinction), sqm_altitude_corrected adds extinction + # At zenith: sqm_final == sqm_altitude_corrected (extinction is 0) + assert details_zenith["sqm_final"] == pytest.approx( + details_zenith["sqm_altitude_corrected"], abs=0.001 ) - # Final SQM should differ by extinction difference - extinction_diff = ( - details_30deg["extinction_correction"] - - details_zenith["extinction_correction"] + # At 30°: sqm_altitude_corrected = sqm_final + extinction + assert details_30deg["sqm_altitude_corrected"] == pytest.approx( + details_30deg["sqm_final"] + expected_ext_30, abs=0.001 ) - sqm_diff = sqm_30deg - sqm_zenith - assert sqm_diff == pytest.approx(extinction_diff, abs=0.001) def test_calculate_missing_fov(self): """Test that calculate() returns None when FOV is missing""" @@ -276,3 +345,341 @@ def test_field_parameters_different_fov(self): assert sqm.arcsec_squared_per_pixel == pytest.approx( expected_arcsec_sq_per_pixel, abs=0.01 ), f"Failed for FOV={fov}" + + +@pytest.mark.unit +class TestMzeroCalculation: + """Unit tests for photometric zero point calculation.""" + + def test_mzero_single_star(self): + """Test mzero calculation with a single star.""" + sqm = SQM() + # mzero = mag + 2.5 * log10(flux) + # For flux=1000, mag=5.0: mzero = 5.0 + 2.5*log10(1000) = 5.0 + 7.5 = 12.5 + fluxes = [1000.0] + mags = [5.0] + + mzero, mzeros = sqm._calculate_mzero(fluxes, mags) + + expected = 5.0 + 2.5 * np.log10(1000) + assert mzero == pytest.approx(expected, abs=0.001) + assert len(mzeros) == 1 + assert mzeros[0] == pytest.approx(expected, abs=0.001) + + def test_mzero_flux_weighted_mean(self): + """Test that mzero uses flux-weighted mean (brighter stars weighted more).""" + sqm = SQM() + # Two stars: one bright (high flux), one dim (low flux) + # The bright star's mzero should dominate + fluxes = [10000.0, 100.0] # 100x difference + mags = [4.0, 8.0] + + mzero, mzeros = sqm._calculate_mzero(fluxes, mags) + + # Individual mzeros + mzero_bright = 4.0 + 2.5 * np.log10(10000) # = 14.0 + mzero_dim = 8.0 + 2.5 * np.log10(100) # = 13.0 + + # Flux-weighted: (14.0*10000 + 13.0*100) / (10000+100) ≈ 13.99 + expected_weighted = (mzero_bright * 10000 + mzero_dim * 100) / (10000 + 100) + + assert mzero == pytest.approx(expected_weighted, abs=0.001) + assert mzeros[0] == pytest.approx(mzero_bright, abs=0.001) + assert mzeros[1] == pytest.approx(mzero_dim, abs=0.001) + + def test_mzero_skips_negative_flux(self): + """Test that stars with negative/zero flux are skipped.""" + sqm = SQM() + fluxes = [1000.0, -1.0, 500.0] # Middle star is saturated (flux=-1) + mags = [5.0, 6.0, 5.5] + + mzero, mzeros = sqm._calculate_mzero(fluxes, mags) + + # Should only use stars 0 and 2 + assert mzero is not None + assert mzeros[0] is not None + assert mzeros[1] is None # Skipped + assert mzeros[2] is not None + + def test_mzero_all_invalid_returns_none(self): + """Test that all invalid fluxes returns None.""" + sqm = SQM() + fluxes = [-1.0, 0.0, -5.0] + mags = [5.0, 6.0, 7.0] + + mzero, mzeros = sqm._calculate_mzero(fluxes, mags) + + assert mzero is None + assert all(m is None for m in mzeros) + + +@pytest.mark.unit +class TestApertureOverlapDetection: + """Unit tests for aperture overlap detection.""" + + def test_no_overlap_far_apart(self): + """Test that far apart stars have no overlap.""" + sqm = SQM() + # Two stars 100 pixels apart + centroids = np.array([[100.0, 100.0], [200.0, 100.0]]) + aperture_radius = 5 + annulus_inner = 6 + annulus_outer = 14 + + excluded = sqm._detect_aperture_overlaps( + centroids, aperture_radius, annulus_inner, annulus_outer + ) + + assert len(excluded) == 0 + + def test_critical_overlap_apertures_touch(self): + """Test CRITICAL overlap when apertures overlap (distance < 2*aperture_radius).""" + sqm = SQM() + # Two stars 8 pixels apart, aperture_radius=5, so 2*5=10 > 8 + centroids = np.array([[100.0, 100.0], [108.0, 100.0]]) + aperture_radius = 5 + annulus_inner = 6 + annulus_outer = 14 + + excluded = sqm._detect_aperture_overlaps( + centroids, aperture_radius, annulus_inner, annulus_outer + ) + + # Both stars should be excluded + assert 0 in excluded + assert 1 in excluded + + def test_high_overlap_aperture_in_annulus(self): + """Test HIGH overlap when aperture enters another star's annulus.""" + sqm = SQM() + # Stars 15 pixels apart: aperture(5) + annulus_outer(14) = 19 > 15 + # This means star 1's aperture is inside star 0's annulus + centroids = np.array([[100.0, 100.0], [115.0, 100.0]]) + aperture_radius = 5 + annulus_inner = 6 + annulus_outer = 14 + + excluded = sqm._detect_aperture_overlaps( + centroids, aperture_radius, annulus_inner, annulus_outer + ) + + # Both should be excluded due to HIGH overlap + assert 0 in excluded + assert 1 in excluded + + def test_no_overlap_at_threshold(self): + """Test no overlap when exactly at safe distance.""" + sqm = SQM() + # Stars 20 pixels apart: aperture(5) + annulus_outer(14) = 19 < 20 + centroids = np.array([[100.0, 100.0], [120.0, 100.0]]) + aperture_radius = 5 + annulus_inner = 6 + annulus_outer = 14 + + excluded = sqm._detect_aperture_overlaps( + centroids, aperture_radius, annulus_inner, annulus_outer + ) + + assert len(excluded) == 0 + + +@pytest.mark.unit +class TestNoiseFloorEstimation: + """Unit tests for adaptive noise floor estimation.""" + + def test_temporal_noise_calculation(self): + """Test temporal noise = read_noise + dark_current * exposure.""" + from PiFinder.sqm.noise_floor import NoiseFloorEstimator + + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + # imx296_processed has read_noise=1.5, dark_current=0.0 + + noise = estimator._estimate_temporal_noise(exposure_sec=1.0) + + # With dark_current=0, temporal_noise = read_noise = 1.5 + assert noise == pytest.approx(1.5, abs=0.01) + + def test_noise_floor_uses_theory_when_dark_pixels_below_bias(self): + """Test that theory is used when dark pixels are impossibly low.""" + from PiFinder.sqm.noise_floor import NoiseFloorEstimator + + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + # bias_offset for imx296_processed is 6.0 + + # Create image with all pixels below bias offset (impossible in reality) + image = np.full((100, 100), 3.0, dtype=np.float32) + + noise_floor, _ = estimator.estimate_noise_floor(image, exposure_sec=0.5) + + # Should use theoretical floor since dark pixels (3.0) < bias (6.0) + assert noise_floor >= estimator.profile.bias_offset + + def test_noise_floor_uses_measured_when_valid(self): + """Test that measured dark pixels are used when valid.""" + from PiFinder.sqm.noise_floor import NoiseFloorEstimator + + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + # bias_offset=6.0, read_noise=1.5, dark_current=0.0 + # theoretical_floor = 6.0 + 1.5 = 7.5 + + # Create image with 5th percentile around 8.0 (valid, above bias) + # Most pixels higher, some low pixels around 8 + image = np.random.uniform(10, 50, (100, 100)).astype(np.float32) + image[0:5, 0:5] = 8.0 # Dark corner + + noise_floor, _ = estimator.estimate_noise_floor(image, exposure_sec=0.5) + + # Should use min(measured, theoretical) + # 5th percentile should be around 8, theoretical is 7.5 + # So should use theoretical 7.5 + assert noise_floor == pytest.approx(7.5, abs=0.5) + + def test_noise_floor_clamped_to_bias_offset(self): + """Test that noise floor is never below bias offset.""" + from PiFinder.sqm.noise_floor import NoiseFloorEstimator + + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + + # Even with weird inputs, should never go below bias + image = np.full((100, 100), 100.0, dtype=np.float32) + + noise_floor, _ = estimator.estimate_noise_floor(image, exposure_sec=0.5) + + assert noise_floor >= estimator.profile.bias_offset + + def test_history_smoothing_after_multiple_estimates(self): + """Test that history smoothing kicks in after 5+ estimates.""" + from PiFinder.sqm.noise_floor import NoiseFloorEstimator + + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + + # First 4 estimates - no smoothing yet + for i in range(4): + image = np.full((100, 100), 10.0 + i, dtype=np.float32) + _, details = estimator.estimate_noise_floor(image, exposure_sec=0.5) + assert details["n_history_samples"] == i + 1 + + # 5th estimate - smoothing should kick in + image = np.full((100, 100), 15.0, dtype=np.float32) + _, details = estimator.estimate_noise_floor(image, exposure_sec=0.5) + assert details["n_history_samples"] == 5 + # dark_pixel_smoothed should be median of history, not raw value + # History: [10, 11, 12, 13, 15] -> median = 12 + assert details["dark_pixel_smoothed"] == pytest.approx(12.0, abs=0.1) + + def test_update_with_zero_sec_sample(self): + """Test zero-second sample updates profile gradually.""" + from PiFinder.sqm.noise_floor import NoiseFloorEstimator + + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + original_bias = estimator.profile.bias_offset + + # Need 3 samples before profile updates + for i in range(3): + # Zero-sec image with bias around 10 + zero_sec = np.random.normal(10.0, 1.5, (100, 100)).astype(np.float32) + estimator.update_with_zero_sec_sample(zero_sec) + + # After 3 samples, profile should update with EMA (alpha=0.2) + # new_bias = 0.2 * measured + 0.8 * original + expected_bias = 0.2 * 10.0 + 0.8 * original_bias + assert estimator.profile.bias_offset == pytest.approx(expected_bias, abs=0.5) + + def test_validate_estimate_too_close_to_median(self): + """Test validation fails when noise floor is too close to image median.""" + from PiFinder.sqm.noise_floor import NoiseFloorEstimator + + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + + # Image where darkest pixels are close to median (uniform image) + # This simulates a situation with no stars/sky gradient + image = np.full((100, 100), 10.0, dtype=np.float32) + + estimator.estimate_noise_floor(image, exposure_sec=0.5) + + # Should be invalid because noise floor (theoretical ~7.5) is close to median (10) + # Actually 7.5 is not > 10 * 0.8 = 8, so let's use a different test + # Need noise floor > median * 0.8 to trigger this + # Create image where dark pixels are above theoretical floor + image2 = np.full((100, 100), 8.0, dtype=np.float32) + estimator2 = NoiseFloorEstimator(camera_type="imx296_processed") + + _, details2 = estimator2.estimate_noise_floor(image2, exposure_sec=0.5) + + # noise_floor will be min(8.0, 7.5) = 7.5 + # median = 8.0, threshold = 8.0 * 0.8 = 6.4 + # 7.5 > 6.4, so should be invalid + assert details2["is_valid"] is False + assert "median" in details2["validation_reason"].lower() + + def test_get_statistics(self): + """Test get_statistics returns expected data.""" + from PiFinder.sqm.noise_floor import NoiseFloorEstimator + + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + + # Do a few estimates + for _ in range(3): + image = np.random.uniform(10, 50, (100, 100)).astype(np.float32) + estimator.estimate_noise_floor(image, exposure_sec=0.5) + + stats = estimator.get_statistics() + + assert stats["camera_type"] == "imx296_processed" + assert stats["n_estimates"] == 3 + assert stats["n_history_samples"] == 3 + assert "dark_pixel_mean" in stats + assert "dark_pixel_std" in stats + assert "dark_pixel_median" in stats + + def test_reset_clears_state(self): + """Test reset clears all history and statistics.""" + from PiFinder.sqm.noise_floor import NoiseFloorEstimator + + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + + # Build up some state + for _ in range(5): + image = np.random.uniform(10, 50, (100, 100)).astype(np.float32) + estimator.estimate_noise_floor(image, exposure_sec=0.5) + + assert estimator.n_estimates == 5 + assert len(estimator.dark_pixel_history) == 5 + + # Reset + estimator.reset() + + assert estimator.n_estimates == 0 + assert len(estimator.dark_pixel_history) == 0 + assert len(estimator.zero_sec_history) == 0 + + def test_save_and_load_calibration(self, tmp_path, monkeypatch): + """Test calibration save/load round-trip.""" + from PiFinder.sqm.noise_floor import NoiseFloorEstimator + + # Redirect Path.home() to tmp_path + monkeypatch.setattr("pathlib.Path.home", lambda: tmp_path) + + # Create PiFinder_data directory + (tmp_path / "PiFinder_data").mkdir() + + estimator = NoiseFloorEstimator(camera_type="imx296_processed") + + # Save calibration with new values + result = estimator.save_calibration( + bias_offset=25.0, read_noise=3.5, dark_current_rate=0.5 + ) + assert result is True + + # Verify profile was updated + assert estimator.profile.bias_offset == 25.0 + assert estimator.profile.read_noise_adu == 3.5 + assert estimator.profile.dark_current_rate == 0.5 + + # Create new estimator - should load the saved calibration + estimator2 = NoiseFloorEstimator(camera_type="imx296_processed") + + # Should have loaded the saved values (not the defaults) + assert estimator2.profile.bias_offset == 25.0 + assert estimator2.profile.read_noise_adu == 3.5 + assert estimator2.profile.dark_current_rate == 0.5