diff --git a/docs/source/dev_arch.rst b/docs/source/dev_arch.rst index 1f512ddc7..969ea6cef 100644 --- a/docs/source/dev_arch.rst +++ b/docs/source/dev_arch.rst @@ -219,12 +219,12 @@ There are three types of shared state in PiFinder SharedStateObj( power_state=1, solve_state=True, - solution={'RA': 22.86683471463411, 'Dec': 15.347716050003328, 'imu_pos': [171.39798541261814, 202.7646132036331, 358.2794741322842], + solution={'RA': 22.86683471463411, 'Dec': 15.347716050003328, 'solve_time': 1695297930.5532792, 'cam_solve_time': 1695297930.5532837, 'Roll': 306.2951794424281, 'FOV': 10.200729425086111, RMSE': 21.995567413046142, 'Matches': 12, 'Prob': 6.987725483613384e-13, 'T_solve': 15.00384000246413, 'RA_target': 22.86683471463411, 'Dec_target': 15.347716050003328, 'T_extract': 75.79255499877036, 'Alt': None, 'Az': None, 'solve_source': 'CAM', 'constellation': 'Psc'}, - imu={'moving': False, 'move_start': 1695297928.69749, 'move_end': 1695297928.764207, 'pos': [171.39798541261814, 202.7646132036331, 358.2794741322842], - 'start_pos': [171.4009455613444, 202.76321535004726, 358.2587208386012], 'status': 3}, + imu={'moving': False, 'move_start': 1695297928.69749, 'move_end': 1695297928.764207, + 'status': 3}, location={'lat': 59.05139745, 'lon': 7.987654, 'altitude': 151.4, 'gps_lock': False, 'timezone': 'Europe/Stockholm', 'last_gps_lock': None}, datetime=None, screen=, diff --git a/python/PiFinder/calc_utils.py b/python/PiFinder/calc_utils.py index 0064009c6..1a5edb0e9 100644 --- a/python/PiFinder/calc_utils.py +++ b/python/PiFinder/calc_utils.py @@ -168,7 +168,7 @@ def aim_degrees(shared_state, mount_type, screen_direction, target): az_diff = target_az - solution["Az"] az_diff = (az_diff + 180) % 360 - 180 if screen_direction in ["flat", "as_bloom"]: - az_diff *= -1 + az_diff *= -1 # TODO: Why should this depend on the screen type? alt_diff = target_alt - solution["Alt"] alt_diff = (alt_diff + 180) % 360 - 180 @@ -177,8 +177,11 @@ def aim_degrees(shared_state, mount_type, screen_direction, target): else: # EQ Mount type ra_diff = target.ra - solution["RA"] + ra_diff = (ra_diff + 180) % 360 - 180 # Convert to -180 to +180 + dec_diff = target.dec - solution["Dec"] dec_diff = (dec_diff + 180) % 360 - 180 + return ra_diff, dec_diff return None, None @@ -256,7 +259,8 @@ def hadec_to_roll(ha_deg, dec_deg, lat_deg): if dec_deg <= lat_deg: roll_deg = -pa_deg else: - roll_deg = -pa_deg + np.sign(ha_deg) * 180 + roll_deg = -pa_deg + #roll_deg = -pa_deg + np.sign(ha_deg) * 180 # Disable for testing TODO: Check this return roll_deg diff --git a/python/PiFinder/camera_interface.py b/python/PiFinder/camera_interface.py index 9752f55c0..6221148c9 100644 --- a/python/PiFinder/camera_interface.py +++ b/python/PiFinder/camera_interface.py @@ -8,17 +8,17 @@ * Takes full res images on demand """ - -import datetime -import logging -import os -import queue -import time from typing import Tuple, Optional - from PIL import Image +import os +import time +import datetime +import numpy as np +import queue +import logging from PiFinder import state_utils, utils +import PiFinder.pointing_model.quaternion_transforms as qt from PiFinder.auto_exposure import ( ExposurePIDController, ExposureSNRController, @@ -177,20 +177,23 @@ def get_image_loop( imu_end = shared_state.imu() # see if we moved during exposure - reading_diff = 0 if imu_start and imu_end: - reading_diff = ( - abs(imu_start["pos"][0] - imu_end["pos"][0]) - + abs(imu_start["pos"][1] - imu_end["pos"][1]) - + abs(imu_start["pos"][2] - imu_end["pos"][2]) + # Returns the pointing difference between successive IMU quaternions as + # an angle (radians). Note that this also accounts for rotation around the + # scope axis. Returns an angle in radians. + pointing_diff = qt.get_quat_angular_diff( + imu_start["quat"], imu_end["quat"] ) + else: + pointing_diff = 0.0 camera_image.paste(base_image) + image_metadata = { "exposure_start": image_start_time, "exposure_end": image_end_time, "imu": imu_end, - "imu_delta": reading_diff, + "imu_delta": np.rad2deg(pointing_diff), "exposure_time": self.exposure_time, "gain": self.gain, } diff --git a/python/PiFinder/imu_pi.py b/python/PiFinder/imu_pi.py index e1d7744ad..ed737f697 100644 --- a/python/PiFinder/imu_pi.py +++ b/python/PiFinder/imu_pi.py @@ -6,65 +6,39 @@ """ import time +from PiFinder import config from PiFinder.multiproclogging import MultiprocLogging import board import adafruit_bno055 import logging - -from scipy.spatial.transform import Rotation - -from PiFinder import config +import quaternion # Numpy quaternion logger = logging.getLogger("IMU.pi") QUEUE_LEN = 10 -MOVE_CHECK_LEN = 2 class Imu: + """ + Previous version modified the IMU axes but the IMU now outputs the + measurements using its native axes and the transformation from the IMU + axes to the camera frame is done by the IMU dead-reckonig functionality. + """ + def __init__(self): i2c = board.I2C() self.sensor = adafruit_bno055.BNO055_I2C(i2c) + # IMPLUS mode: Accelerometer + Gyro + Fusion data self.sensor.mode = adafruit_bno055.IMUPLUS_MODE # self.sensor.mode = adafruit_bno055.NDOF_MODE - cfg = config.Config() - if ( - cfg.get_option("screen_direction") == "flat" - or cfg.get_option("screen_direction") == "straight" - or cfg.get_option("screen_direction") == "flat3" - ): - self.sensor.axis_remap = ( - adafruit_bno055.AXIS_REMAP_Y, - adafruit_bno055.AXIS_REMAP_X, - adafruit_bno055.AXIS_REMAP_Z, - adafruit_bno055.AXIS_REMAP_POSITIVE, - adafruit_bno055.AXIS_REMAP_POSITIVE, - adafruit_bno055.AXIS_REMAP_NEGATIVE, - ) - elif cfg.get_option("screen_direction") == "as_bloom": - self.sensor.axis_remap = ( - adafruit_bno055.AXIS_REMAP_X, - adafruit_bno055.AXIS_REMAP_Z, - adafruit_bno055.AXIS_REMAP_Y, - adafruit_bno055.AXIS_REMAP_POSITIVE, - adafruit_bno055.AXIS_REMAP_POSITIVE, - adafruit_bno055.AXIS_REMAP_POSITIVE, - ) - else: - self.sensor.axis_remap = ( - adafruit_bno055.AXIS_REMAP_Z, - adafruit_bno055.AXIS_REMAP_Y, - adafruit_bno055.AXIS_REMAP_X, - adafruit_bno055.AXIS_REMAP_POSITIVE, - adafruit_bno055.AXIS_REMAP_POSITIVE, - adafruit_bno055.AXIS_REMAP_POSITIVE, - ) + self.quat_history = [(0, 0, 0, 0)] * QUEUE_LEN self._flip_count = 0 self.calibration = 0 - self.avg_quat = (0, 0, 0, 0) + self.avg_quat = (0, 0, 0, 0) # Scalar-first quaternion as float: (w, x, y, z) self.__moving = False - + self.__reading_diff = 0.0 + self.last_sample_time = time.time() # Calibration settings @@ -74,23 +48,12 @@ def __init__(self): # to start moving, second is threshold to fall below # to stop moving. + cfg = config.Config() imu_threshold_scale = cfg.get_option("imu_threshold_scale", 1) self.__moving_threshold = ( 0.0005 * imu_threshold_scale, 0.0003 * imu_threshold_scale, ) - - def quat_to_euler(self, quat): - if quat[0] + quat[1] + quat[2] + quat[3] == 0: - return 0, 0, 0 - rot = Rotation.from_quat(quat) - rot_euler = rot.as_euler("xyz", degrees=True) - # convert from -180/180 to 0/360 - rot_euler[0] += 180 - rot_euler[1] += 180 - rot_euler[2] += 180 - return rot_euler - def moving(self): """ Compares most recent reading @@ -110,6 +73,7 @@ def update(self): if self.calibration == 0: logger.warning("NOIMU CAL") return True + # adafruit_bno055 uses quaternion convention (w, x, y, z) quat = self.sensor.quaternion if quat[0] is None: logger.warning("IMU: Failed to get sensor values") @@ -132,6 +96,9 @@ def update(self): # Sometimes the quat output will 'flip' and change by 2.0+ # from one reading to another. This is clearly noise or an # artifact, so filter them out + # + # NOTE: This is probably due to the double-cover property of quaternions + # where +q and -q describe the same rotation? if self.__reading_diff > 1.5: self._flip_count += 1 if self._flip_count > 10: @@ -148,7 +115,9 @@ def update(self): # no flip self._flip_count = 0 + # avg_quat is the latest quaternion measurement, not the average self.avg_quat = quat + # Write over the quat_hisotry queue FIFO: if len(self.quat_history) == QUEUE_LEN: self.quat_history = self.quat_history[1:] self.quat_history.append(quat) @@ -160,9 +129,6 @@ def update(self): if self.__reading_diff > self.__moving_threshold[0]: self.__moving = True - def get_euler(self): - return list(self.quat_to_euler(self.avg_quat)) - def __str__(self): return ( f"IMU Information:\n" @@ -195,35 +161,40 @@ def imu_monitor(shared_state, console_queue, log_queue): imu = Imu() imu_calibrated = False + # TODO: Remove move_start, move_end imu_data = { "moving": False, "move_start": None, "move_end": None, - "pos": [0, 0, 0], - "quat": [0, 0, 0, 0], - "start_pos": [0, 0, 0], + "quat": quaternion.quaternion( + 0, 0, 0, 0 + ), # Scalar-first numpy quaternion(w, x, y, z) - Init to invalid quaternion "status": 0, } + while True: imu.update() imu_data["status"] = imu.calibration + + # TODO: move_start and move_end don't seem to be used? if imu.moving(): if not imu_data["moving"]: logger.debug("IMU: move start") imu_data["moving"] = True - imu_data["start_pos"] = imu_data["pos"] imu_data["move_start"] = time.time() - imu_data["pos"] = imu.get_euler() - imu_data["quat"] = imu.avg_quat - + # DISABLE old method + imu_data["quat"] = quaternion.from_float_array( + imu.avg_quat + ) # Scalar-first (w, x, y, z) else: if imu_data["moving"]: # If we were moving and we now stopped logger.debug("IMU: move end") imu_data["moving"] = False - imu_data["pos"] = imu.get_euler() - imu_data["quat"] = imu.avg_quat imu_data["move_end"] = time.time() + imu_data["quat"] = quaternion.from_float_array( + imu.avg_quat + ) # Scalar-first (w, x, y, z) if not imu_calibrated: if imu_data["status"] == 3: @@ -242,6 +213,7 @@ def imu_monitor(shared_state, console_queue, log_queue): imu = Imu() for i in range(10): imu.update() + print(imu) time.sleep(0.5) except Exception as e: logger.exception("Error starting phyiscal IMU", e) diff --git a/python/PiFinder/integrator.py b/python/PiFinder/integrator.py index a56cc46eb..716956857 100644 --- a/python/PiFinder/integrator.py +++ b/python/PiFinder/integrator.py @@ -5,90 +5,64 @@ * Checks IMU * Plate solves high-res image +TODO: +- Rename solved --> pointing_estimate (also includes IMU) +- Rename next_image_solved --> new_solve +- Rename last_image_solve --> prev_solve (previous successful solve) +- Simplify program flow and explain in comments at top +- Refactor into class PointingTracker + """ +import datetime import queue import time import copy import logging +import numpy as np +import quaternion # numpy-quaternion from PiFinder import config from PiFinder import state_utils import PiFinder.calc_utils as calc_utils from PiFinder.multiproclogging import MultiprocLogging +from PiFinder.pointing_model.astro_coords import initialized_solved_dict, RaDecRoll +from PiFinder.pointing_model.imu_dead_reckoning import ImuDeadReckoning +import PiFinder.pointing_model.quaternion_transforms as qt -IMU_ALT = 2 -IMU_AZ = 0 logger = logging.getLogger("IMU.Integrator") - -def imu_moved(imu_a, imu_b): - """ - Compares two IMU states to determine if they are the 'same' - if either is none, returns False - """ - if imu_a is None: - return False - if imu_b is None: - return False - - # figure out the abs difference - diff = ( - abs(imu_a[0] - imu_b[0]) + abs(imu_a[1] - imu_b[1]) + abs(imu_a[2] - imu_b[2]) - ) - if diff > 0.001: - return True - return False +# Constants: +# Use IMU tracking if the angle moved is above this +# TODO: May need to adjust this depending on the IMU sensitivity thresholds +IMU_MOVED_ANG_THRESHOLD = np.deg2rad(0.06) def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=False): MultiprocLogging.configurer(log_queue) + """ """ + if is_debug: + logger.setLevel(logging.DEBUG) + logger.debug("Starting Integrator") + try: - if is_debug: - logger.setLevel(logging.DEBUG) - logger.debug("Starting Integrator") - - solved = { - "RA": None, - "Dec": None, - "Roll": None, - "camera_center": { - "RA": None, - "Dec": None, - "Roll": None, - "Alt": None, - "Az": None, - }, - "camera_solve": { - "RA": None, - "Dec": None, - "Roll": None, - }, - "Roll_offset": 0, # May/may not be needed - for experimentation - "imu_pos": None, - "Alt": None, - "Az": None, - "solve_source": None, - "solve_time": None, - "cam_solve_time": 0, - "constellation": None, - } + # Dict of RA, Dec, etc. initialized to None: + solved = initialized_solved_dict() cfg = config.Config() - if ( - cfg.get_option("screen_direction") == "left" - or cfg.get_option("screen_direction") == "flat" - or cfg.get_option("screen_direction") == "flat3" - or cfg.get_option("screen_direction") == "straight" - ): - flip_alt_offset = True - else: - flip_alt_offset = False + + mount_type = cfg.get_option("mount_type") + logger.debug(f"mount_type = {mount_type}") + + # Set up dead-reckoning tracking by the IMU: + imu_dead_reckoning = ImuDeadReckoning(cfg.get_option("screen_direction")) + # imu_dead_reckoning.set_alignment(q_scope2cam) # TODO: Enable when q_scope2cam is available from alignment # This holds the last image solve position info # so we can delta for IMU updates last_image_solve = None last_solve_time = time.time() + while True: state_utils.sleep_for_framerate(shared_state) @@ -107,6 +81,7 @@ def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=Fa solved = copy.deepcopy(last_image_solve) # If no successful solve yet, keep initial solved dict + # TODO: Create a function to update solve? # Update solve metadata (always needed for auto-exposure) for key in [ "Matches", @@ -121,58 +96,6 @@ def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=Fa if next_image_solve.get("RA") is not None: solved.update(next_image_solve) - # Recalculate Alt/Az for NEW successful solve - location = shared_state.location() - dt = shared_state.datetime() - - if location and dt: - # We have position and time/date and a valid solve! - calc_utils.sf_utils.set_location( - location.lat, - location.lon, - location.altitude, - ) - alt, az = calc_utils.sf_utils.radec_to_altaz( - solved["RA"], - solved["Dec"], - dt, - ) - solved["Alt"] = alt - solved["Az"] = az - - alt, az = calc_utils.sf_utils.radec_to_altaz( - solved["camera_center"]["RA"], - solved["camera_center"]["Dec"], - dt, - ) - solved["camera_center"]["Alt"] = alt - solved["camera_center"]["Az"] = az - - # Experimental: For monitoring roll offset - # Estimate the roll offset due misalignment of the - # camera sensor with the Pole-to-Source great circle. - solved["Roll_offset"] = estimate_roll_offset(solved, dt) - # Find the roll at the target RA/Dec. Note that this doesn't include the - # roll offset so it's not the roll that the PiFinder camear sees but the - # roll relative to the celestial pole - roll_target_calculated = calc_utils.sf_utils.radec_to_roll( - solved["RA"], solved["Dec"], dt - ) - # Compensate for the roll offset. This gives the roll at the target - # as seen by the camera. - solved["Roll"] = roll_target_calculated + solved["Roll_offset"] - - # calculate roll for camera center - roll_target_calculated = calc_utils.sf_utils.radec_to_roll( - solved["camera_center"]["RA"], - solved["camera_center"]["Dec"], - dt, - ) - # Compensate for the roll offset. This gives the roll at the target - # as seen by the camera. - solved["camera_center"]["Roll"] = ( - roll_target_calculated + solved["Roll_offset"] - ) # For failed solves, preserve ALL position data from previous solve # Don't recalculate from GPS (causes drift from GPS noise) @@ -196,108 +119,231 @@ def integrator(shared_state, solver_queue, console_queue, log_queue, is_debug=Fa # Push all camera solves (success and failure) immediately # This ensures auto-exposure sees Matches=0 for failed solves shared_state.set_solution(solved) - - # Use IMU dead-reckoning from the last camera solve: - # Check we have an alt/az solve, otherwise we can't use the IMU - elif solved["Alt"]: + shared_state.set_solve_state(True) + + # We have a new image solve: Use plate-solving for RA/Dec + update_plate_solve_and_imu(imu_dead_reckoning, solved) + + # TODO: main also calculates (alt, az) for target & camera center. + # Don't think this is needed because they are done by the update functions? + elif imu_dead_reckoning.tracking: + # Previous plate-solve exists so use IMU dead-reckoning from + # the last plate solved coordinates. imu = shared_state.imu() if imu: - dt = shared_state.datetime() - if last_image_solve and last_image_solve["Alt"]: - # If we have alt, then we have a position/time - - # calc new alt/az - lis_imu = last_image_solve["imu_pos"] - imu_pos = imu["pos"] - if imu_moved(lis_imu, imu_pos): - alt_offset = imu_pos[IMU_ALT] - lis_imu[IMU_ALT] - if flip_alt_offset: - alt_offset = ((alt_offset + 180) % 360 - 180) * -1 - else: - alt_offset = (alt_offset + 180) % 360 - 180 - solved["Alt"] = (last_image_solve["Alt"] - alt_offset) % 360 - solved["camera_center"]["Alt"] = ( - last_image_solve["camera_center"]["Alt"] - alt_offset - ) % 360 - - az_offset = imu_pos[IMU_AZ] - lis_imu[IMU_AZ] - az_offset = (az_offset + 180) % 360 - 180 - solved["Az"] = (last_image_solve["Az"] + az_offset) % 360 - solved["camera_center"]["Az"] = ( - last_image_solve["camera_center"]["Az"] + az_offset - ) % 360 - - # N.B. Assumes that location hasn't changed since last solve - # Turn this into RA/DEC - ( - solved["RA"], - solved["Dec"], - ) = calc_utils.sf_utils.altaz_to_radec( - solved["Alt"], solved["Az"], dt - ) - # Calculate the roll at the target RA/Dec and compensate for the offset. - solved["Roll"] = ( - calc_utils.sf_utils.radec_to_roll( - solved["RA"], solved["Dec"], dt - ) - + solved["Roll_offset"] - ) - - # Now for camera centered solve - ( - solved["camera_center"]["RA"], - solved["camera_center"]["Dec"], - ) = calc_utils.sf_utils.altaz_to_radec( - solved["camera_center"]["Alt"], - solved["camera_center"]["Az"], - dt, - ) - # Calculate the roll at the target RA/Dec and compensate for the offset. - solved["camera_center"]["Roll"] = ( - calc_utils.sf_utils.radec_to_roll( - solved["camera_center"]["RA"], - solved["camera_center"]["Dec"], - dt, - ) - + solved["Roll_offset"] - ) - - solved["solve_time"] = time.time() - solved["solve_source"] = "IMU" + update_imu(imu_dead_reckoning, solved, last_image_solve, imu) # Push IMU updates only if newer than last push - # (Camera solves already pushed above at line 185) + # (Camera solves already pushed above at line ~185) # TODO: Line numbers may have changed in merge if ( solved["RA"] and solved["solve_time"] > last_solve_time - and solved.get("solve_source") == "IMU" + #and solved["solve_source"] == "IMU" ): - last_solve_time = time.time() - # Calculate constellation for IMU dead-reckoning position - solved["constellation"] = calc_utils.sf_utils.radec_to_constellation( - solved["RA"], solved["Dec"] + last_solve_time = time.time() # TODO: solve_time is ambiguous because it's also used for IMU dead-reckoning + + # Set location for roll and altaz calculations. + # TODO: Is it necessary to set location? + # TODO: Altaz doesn't seem to be required for catalogs when in + # EQ mode? Could be disabled in future when in EQ mode? + location = shared_state.location() + dt = shared_state.datetime() + if location: + calc_utils.sf_utils.set_location( + location.lat, location.lon, location.altitude + ) + + # Set the roll so that the chart is displayed appropriately for the mount type + solved["Roll"] = get_roll_by_mount_type( + solved["RA"], solved["Dec"], location, dt, mount_type ) + # Update remaining solved keys + # Calculate constellation for current position + solved["constellation"] = ( + calc_utils.sf_utils.radec_to_constellation( + solved["RA"], solved["Dec"] + ) + ) # TODO: Can the outer brackets be omitted? + + # Set Alt/Az because it's needed for the catalogs for the + # Alt/Az mount type. TODO: Can this be moved to the catalog? + dt = shared_state.datetime() + if location and dt: + solved["Alt"], solved["Az"] = calc_utils.sf_utils.radec_to_altaz( + solved["RA"], solved["Dec"], dt + ) + # Push IMU update shared_state.set_solution(solved) shared_state.set_solve_state(True) + except EOFError: logger.error("Main no longer running for integrator") -def estimate_roll_offset(solved, dt): +# ======== Wrapper and helper functions =============================== + + +def update_plate_solve_and_imu(imu_dead_reckoning: ImuDeadReckoning, solved: dict): """ - Estimate the roll offset due to misalignment of the camera sensor with - the mount/scope's coordinate system. The offset is calculated at the - center of the camera's FoV. + Wrapper for ImuDeadReckoning.update_plate_solve_and_imu() to + interface angles in degrees to radians. + + This updates the pointing model with the plate-solved coordinates and the + IMU measurements which are assumed to have been taken at the same time. + """ + if (solved["RA"] is None) or (solved["Dec"] is None): + return # No update + else: + # Successfully plate solved & camera pointing exists + if solved["imu_quat"] is None: + q_x2imu = quaternion.quaternion(np.nan) + else: + q_x2imu = solved["imu_quat"] # IMU measurement at the time of plate solving + + # Update: + solved_cam = RaDecRoll() + solved_cam.set_from_deg( + solved["camera_center"]["RA"], + solved["camera_center"]["Dec"], + solved["camera_center"]["Roll"], + ) + imu_dead_reckoning.update_plate_solve_and_imu(solved_cam, q_x2imu) + + # Set alignment. TODO: Do this once at alignment. Move out of here. + set_alignment(imu_dead_reckoning, solved) + - To calculate the roll with offset: roll = calculated_roll + roll_offset +def set_alignment(imu_dead_reckoning: ImuDeadReckoning, solved: dict): """ - # Calculate the expected roll at the camera center given the RA/Dec of - # of the camera center. - roll_camera_calculated = calc_utils.sf_utils.radec_to_roll( - solved["camera_center"]["RA"], solved["camera_center"]["Dec"], dt + Set alignment. + TODO: Do this once at alignment + """ + # RA, Dec of camera center:: + solved_cam = RaDecRoll() + solved_cam.set_from_deg( + solved["camera_center"]["RA"], + solved["camera_center"]["Dec"], + solved["camera_center"]["Roll"], ) - roll_offset = solved["camera_center"]["Roll"] - roll_camera_calculated - return roll_offset + # RA, Dec of target (where scope is pointing): + solved["Roll"] = 0 # Target roll isn't calculated by Tetra3. Set to zero here + solved_scope = RaDecRoll() + solved_scope.set_from_deg(solved["RA"], solved["Dec"], solved["Roll"]) + + # Set alignment in imu_dead_reckoning + imu_dead_reckoning.set_alignment(solved_cam, solved_scope) + + +def update_imu( + imu_dead_reckoning: ImuDeadReckoning, + solved: dict, + last_image_solve: dict, + imu: dict, +): + """ + Updates the solved dictionary using IMU dead-reckoning from the last + solved pointing. + """ + if not (last_image_solve and imu_dead_reckoning.tracking): + return # Need all of these to do IMU dead-reckoning + + assert isinstance( + imu["quat"], quaternion.quaternion + ), "Expecting quaternion.quaternion type" # TODO: Can be removed later + q_x2imu = imu["quat"] # Current IMU measurement (quaternion) + + # When moving, switch to tracking using the IMU + angle_moved = qt.get_quat_angular_diff(last_image_solve["imu_quat"], q_x2imu) + if angle_moved > IMU_MOVED_ANG_THRESHOLD: + # Estimate camera pointing using IMU dead-reckoning + logger.debug( + "Track using IMU: Angle moved since last_image_solve = " + "{:}(> threshold = {:}) | IMU quat = ({:}, {:}, {:}, {:})".format( + np.rad2deg(angle_moved), + np.rad2deg(IMU_MOVED_ANG_THRESHOLD), + q_x2imu.w, + q_x2imu.x, + q_x2imu.y, + q_x2imu.z, + ) + ) + + # Dead-reckoning using IMU + imu_dead_reckoning.update_imu(q_x2imu) # Latest IMU measurement + + # Store current camera pointing estimate: + cam_eq = imu_dead_reckoning.get_cam_radec() + ( + solved["camera_center"]["RA"], + solved["camera_center"]["Dec"], + solved["camera_center"]["Roll"], + ) = cam_eq.get_deg(use_none=True) + + # Store the current scope pointing estimate + scope_eq = imu_dead_reckoning.get_scope_radec() + solved["RA"], solved["Dec"], solved["Roll"] = scope_eq.get_deg(use_none=True) + + solved["solve_time"] = time.time() + solved["solve_source"] = "IMU" + + # Logging for states updated in solved: + logger.debug( + "IMU update: scope: RA: {:}, Dec: {:}, Roll: {:}".format( + solved["RA"], solved["Dec"], solved["Roll"] + ) + ) + logger.debug( + "IMU update: camera_center: RA: {:}, Dec: {:}, Roll: {:}".format( + solved["camera_center"]["RA"], + solved["camera_center"]["Dec"], + solved["camera_center"]["Roll"], + ) + ) + + +def get_roll_by_mount_type( + ra_deg: float, # Right Ascension of the target in degrees + dec_deg: float, # Declination of the target in degrees + location, # astropy EarthLocation object or None + dt: datetime.datetime, # datetime.datetime object or None + mount_type: str # "Alt/Az" or "EQ" +) -> float: + """ + Returns the roll (in degrees) depending on the mount type so that the chart + is displayed appropriately for the mount type. The RA and Dec of the target + should be provided (in degrees). + + * Alt/Az mount: Display the chart in the horizontal coordinate so that up + in the chart points to the Zenith. + * EQ mount: Display the chart in the equatorial coordinate system with the + NCP up so roll = 0. + + Assumes that location has already been set in calc_utils.sf_utils. + """ + if mount_type == "Alt/Az": + # Altaz mounts: Display chart in horizontal coordinates + if location and dt: + # We have location and time/date (and assume that location has been set) + # Roll at the target RA/Dec in the horizontal frame + roll_deg = calc_utils.sf_utils.radec_to_roll(ra_deg, dec_deg, dt) + else: + # No position or time/date available, so set roll to 0.0 + roll_deg = 0.0 + elif mount_type == "EQ": + # EQ-mounts: Display chart with NCP up so roll = 0.0 + roll_deg = 0.0 + else: + logger.error(f"Unknown mount type: {mount_type}. Cannot set roll.") + roll_deg = 0.0 + + # If location is available, adjust roll for hemisphere: + # Altaz: North up in northern hemisphere, South up in southern hemisphere + # EQ mounts: NCP up in northern hemisphere, SCP up in southern hemisphere + if location: + if location.lat < 0.0: + roll_deg += 180.0 # Southern hemisphere + + return roll_deg diff --git a/python/PiFinder/pointing_model/README.md b/python/PiFinder/pointing_model/README.md new file mode 100644 index 000000000..2dff9d9b9 --- /dev/null +++ b/python/PiFinder/pointing_model/README.md @@ -0,0 +1,345 @@ +README: IMU support prototyping +=============================== + +> The first part of this README is temporary for the prototyping phase of the IMU support. + +# TODO + +>Remove this before release! + +See Discord thread: https://discord.com/channels/1087556380724052059/1406599296002035895 + + +For future: +* Calibration to align IMU frame to camera frame to remove the residual error. +* Update imu_pi.py +* Set alignment once at alignment rather than calculating it every loop? + +Done: +* Fails nox +* Support other PiFinder types +* Adjust Roll depending on mount_type for charts +* Lint +* Type hints for integrator.py +* Use RaDecRoll class --> Done. Need to test. +* Go through TODOs in code +* Doesn't pass Nox +* Doesn't run wih v3 Flat. Issue in main branch? (reported by grimaldi: 20 Aug 2025 https://discord.com/channels/1087556380724052059/1406599296002035895/1407813498163167345) +* In EQ mode flickers between 0° and 359°. This is also in the main branch. +* Issue in the default requirements? Error (not related to the IMU changes) + when I try to create a new env using requirements.txt. Maybe I can't just + create a new environment using pip? +* Clean up README + * Remove instruction on venv + * Remove descriptions of frames +* Brickbots tested Right version got strange estimates like the axes were swapped around --> Was due to an incorrect rotation `q_imu2cam` for the "right" PiFinder. Check the others. --> Fixed 25 Oct 2025 (6f7e17b) + +# Sky test log + +>Remove this before release! + +## 20251030: b8e09ff (tested 31 Oct) + +* v2 Flat. +* Issue with chart as before. Switches over in the E at around RA=+6hrs and +also in the West around 22hrs? + +## 20251021: 7519b (tested 24 Oct) + +* v2 Flat. +* Pointings in RA, Dec were OK but in Chart mode, the chart rotated 180 degrees +as I crossed to the East of around Mirfak in Perseus with the result that +slewing to the East showed up as moving to the right. +* Brickbots tested Right version got strange estimates like the axes were + swapped around. + +## 20251001: c6422 (tested 5 Oct) + +* v2 Flat. Exposure 0.4s. +* Tested in altaz mode. +* 97% full moon. Zenity NELM 3. +* Worked fine. When moved quickly to a target, the IMU mode got it to within +1-2° and then it snapped to the pointing from the plate solve and stayed there. +I didn't see any jiggling of the SkySafari cursor when zoomed in at a scale of +5° FOV. +* Changes since last test: Cleaning up & refactoring. EQ mode angle changed to + +/-. Numpy version updated. + +## 20250831: af358e (tested 5/6 Aug) + +* Tested on altaz mount in altaz mode. +* Seemed ok. Didn't check chart display. +* Changes have mainly been refactoring & linting. + +## 20250819: 700f77c (tested 19/20 Aug) + +* Tested on altaz mount in altaz & eq mode +* OK: + * Changed chart display so that altaz is in horizontal frame and EQ mode displays in equatorial + coordinates. This appears to work. + * Tracking on chart and SkySafari OK. +* Issues: + * Catalog crashes in altaz mode (ok in EQ mode). Probably because we don't calculate altaz in integrator.py? Same behaviour under test mode so we could do desktop tests. + + +## 20250817: 5cf8aae + +* Tested on altaz and eq mounts +* **altaz:** Tracked fine. When the PiFinder was non-upright, I got the + feeling it tended to jump after an IMU track and got a plate-solve. This + wasn't seen when the PiFinder was upright. When non-upright, the crosshair + moved diagonally when the scope was moved in az or alt. The rotated + constellations in the chart were hard to make out. +* **EQ:** Seemed to work fine but I'm not experienced with EQ. The display on + SkySafari showed RA movement along the horizontal direction and Dec along + the vertical. This seemed to make sense. + +# Installation & set up + +## Install additional packages + +This branch needs the `numpy.quaternion` package. To do this, run +`pifinder_post_update.sh`, which will install the new package and updage +`numpy`. + +PiFinder can be run from the command line as usual: + +```bash +cd ~/PiFinder/python +python3 -m PiFinder.main +``` + +# Theory + +## Quaternion rotation + +A quaternion is defined by + +$$\mathbf{q} = \cos(\theta/2) + (u_x \mathbf{i} + u_y \mathbf{j} + u_z +\mathbf{k}) \sin(\theta / 2)$$ + +This can be interpreted as a rotation around the axis $\mathbf{u}$ by an angle +$\theta$ in the clockwise direction when looking along $\mathbf{u}$ from the +origin. Alternatively, using the right-hand corkscrew rule, the right thumb +points along the axis and fingers in the direction of positive rotation. + +We can express a quaternion as + +$$\mathbf{q} = (w, x, y, z)$$ + +where $w$ is the scalar part and $(x, y, z)$ is the vector part. We will use +the *scalar-first* convention used by Numpy Quaternion. + +A vector can be rotated by the quaternion $\mathbf{q}$ by defining the vector +as a pure quaternion $\mathbf{p}$ (where the scalar part is zero) as follows: + +$\mathbf{p^\prime} = \mathbf{qpq}^{-1}$ + + +### Numpy quaternion + +In Numpy Quaternion, we can create a quaternion using + +```python +q = quaternion.quaternion(w, x, y, z) +``` + +Quaternion multiplications are simply `q1 * q2`. + +The inverse (or conjugate) is given by `q.conj()`. + + +### Intrinsic and extrinsic rotation + +Intrinsic rotation of $q_0$ followed by $q_1$ + +$$q_{new} = q_0 q_1$$ + +For an extrinsic rotation of $q_0$ followed by $q_1$, left multiply + +$$q_{new} = q_1 q_0$$ + + +## Coordinate frames + +### Coordinate frame definitions + +We define the following reference frames: + +#### Equatorial coordinate system +* Centered around the center of the Earth with the $xy$ plane running through + the Earths' equator. $+z$ points to the north pole and $+x$ to the Vernal + equinox. + +#### Horizontal coordinate system +* Centred around the observer. We will use the convention: +* $x$ points South, $y$ to East and $z$ to the zenith. + +#### Scope frame +* +z is boresight. +* On an altaz mount, we define +y as the vertical direction of the scope and +x + as the horizontal direction to the left when looking along the boresight. +* In the ideal case, the Scope frame is assumed to be the same as the Gimbal + frame. In reality, there may be errors due to mounting or gravity. + +#### Camera frame +* The camera frame describes the pointing of the PiFinder's camera. There will + be an offset between the camera and the scope. +* $+z$ is the boresight of the camera, $+y$ and $+x$ are respectively the + vertical and horizontal (to the left) directions of the camera. + +#### IMU frame +* The IMU frame is the local coordinates that the IMU outputs the data in. +* The diagram below illustrates the IMU coordinate frame for the v2 PiFinder + with the Adafruit BNO055 IMU. + +### IMU and camera coordinate frames + +To use the IMU for dead-reckoning, we need to know the transformation between +the IMU's own coordinate frame and the PiFinder's camera coordinate frame +(which we use as the PiFinder's reference coordinate frame). + +The picture below illustrate the IMU and camera coordinates for the v2 flat +version of the PiFinder. For each type, we need to work out the quaternion +rotation `q_imu2cam` that rotates the IMU frame to the camera frame. + +![Image](docs/PiFinder_Flat_bare_PCB_camera_coords.jpg) + +The transformations will be approximate and there will be small errors in +`q_imu2cam` due to mechanical tolerances. These errors will contribute to the +tracking error between the plate solved coordinates and the IMU dead-reckoning. + +### Roll + +The roll (as given by Tetra3) is defined as the rotation of the north pole +relative to the camera image's "up" direction ($+y$). A positive roll angle +means that the pole is counter-clockwise from image "up" when looking out +towards the sky (i.e. towards East). + +### Telescope coordinate transformations + +We will use quaternions to describe rotations. In our convention, the +orientation of the camera frame in equatorial frame is written as `q_eq2cam`. +This is the rotation from the EQ frame's origin to the camera frame. We can +also write the rotation from the camera frame to the scope frame as +`q_cam2scope`. The two quaternions can be multiplied to describe the equitorial +coordinates of the scope pointing by + + +```python +q_eq2cam * q_cam2scope = q_eq2scope +``` + +Note that this convention makes it clear when applying intrinsic rotations (right-multiply). + +The Mount and Gimbal frames are not used in the current implementation but this +framework could be used to extend the implementation to control the mount. For +example, `q_mnt2gimb` depends on the gimbal angles, which is what we can +control to move the scope. + +## Coordinate frame transformation + +We will use the equatorial frame as the reference frame. The goal is determine +the scope pointing in RA and Dec. The pointing of the scope relative to the +equatorial frame can be described by quaternion $q_{eq2scope}$. + +The PiFinder uses the coordinates from plate-solving but this is at a low rate +and plate-solving may not succeed when the scope is moving so the IMU +measurements can be used to infer the pointing between plate-solving by +dead-reckoning. + +### Plate solving + +Plate-solving returns the pointing of the PiFinder camera in (RA, Dec, Roll) +coordinates. The quaternion rotation of the camera pointing relative to the +equatorial frame for time step $k$ is given by $q_{eq2cam}(k)$ and the scope +pointing is give by, + +$$q_{eq2scope}(k) = q_{eq2cam}(k) \; q_{cam2scope}$$ + +We use the PiFinder's camera frame as the reference because plate solving is +done relative to the camera frame. $q_{cam2scope}$ is the quaternion that +represents the alignment offset between the PiFinder camera frame and the scope +frame + +### Alignment + +The alignment offset between the PiFinder camera frame and the scope frame is +determined during alignment of the PiFinder with the scope and is assumed to be +fixed. The goal of alignment is to determine the quaternion $q_{cam2scope}$. + +During alignment, the user user selects the target seen in the center of the +eyepiece, which gives the (RA, Dec) of the scope pointing but not the roll. We +can assume some arbitrary roll value (say roll = 0) and get $q_{eq2scope}$. At +the same time, plate solving measures the (RA, Dec, Roll) at the camera center +or $q_{eq2cam}$. We can express the relation by, + +$$q_{eq2scope} = q_{eq2cam} \; q_{cam2scope}$$ + +Rearranging this gives, + +$$q_{cam2scope} = q_{eq2cam}^{-1} \; q_{eq2scope}$$ + +Note that for unit quaternions, we can also use the conjugate $q^*$ instead of +$q^{-1}$, because the conjugate is slightly faster to compute. + +Roll returned by plate-solving is not relevant for pointing and it can be +arbitrary but it is needed for full three degrees-of-freedom dead-reckoning by +the IMU. + +### Dead-reckoning + +Between plate solving, the IMU extrapolates the scope orientation by dead +reckoning. Suppose that we want to use the IMU measurement at time step $k$ to +estimate the scope pointing; + +```python +q_eq2scope(k) = q_eq2cam(k-m) * q_cam2imu * q_x2imu(k-m).conj() * q_x2imu(k) * q_imu2cam * q_cam2scope +``` + +Where +1. `k` represents the current time step and `k-m` represents the time step + where we had a last solve. +2. `q_x2imu(k)` is the current IMU measurement quaternion w.r.t its own + drifting reference frame `x`. +3. Note that the quaternion `q_x2imu(k-m).conj() * q_x2imu(k)` rotates the IMU + body from the orientation in the last solve (at time step `k-m`) to to the + current orientation (at time step `k`). +4. `q_cam2imu = q_imu2cam.conj()` is the alignment of the IMU to the camera and +depends on the PiFinder configuration. There will be some error due to +mechanical tolerances which will propagate to the pointing error when using the +IMU. + +We can pre-compute the first three terms after plate solving at time step +`k-m`, which corresponds to the quaternion rotation from the `eq` frame to the +IMU's reference frame `x`. + +```python +q_eq2x(k-m) = q_eq2cam(k-m) * q_cam2imu * q_x2imu(k-m).conj() +``` + +## Potential future improvements + +A potential next step could be to use a Kalman filter framework to estimate the +pointing. Some of the benefits are: + +* Smoother, filtered pointing estimate. +* Improves the accuracy of the pointing estimate. Accuracy may be more + beneficial when using the PiFinder estimate to control driven mounts. +* Potentially enable any generic IMU (with gyro and accelerometer) to be used + without internal fusion FW, which tends to add to the BOM cost. +* If required, could take fewer plate-solving frames by only triggering a plate +solve when the uncertainty of the Kalman filter estimate based on IMU +dead-reckoning exceeds some limit. This can reduce power consumption and allow +for a cheaper, less powerful computing platform to be used. + +The accuracy improvement will likely come from the following sources: + +* Filtering benefits from the averaging effects of using multiple measurements. +* The Kalman filter will estimate the accelerometer and gyro bias online. The +calibration will be done in conjunction with the plate-solved measurements so +it will be better than an IMU-only calibration. +* The orientation of the IMU to the camera frame, $q_{imu2cam}$, has errors +because of mechanical tolerances. The Kalman filter will calibrate for this +online. This will improve the accuracy and enable other non-standard +form-factors. diff --git a/python/PiFinder/pointing_model/__init__.py b/python/PiFinder/pointing_model/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/PiFinder/pointing_model/astro_coords.py b/python/PiFinder/pointing_model/astro_coords.py new file mode 100644 index 000000000..2304825ae --- /dev/null +++ b/python/PiFinder/pointing_model/astro_coords.py @@ -0,0 +1,147 @@ +""" +Various astronomical coordinates functions +""" + +from dataclasses import dataclass +import numpy as np +import quaternion +from typing import Union # When updated to Python 3.10+, remove and use new type hints + +import PiFinder.pointing_model.quaternion_transforms as qt + + +@dataclass +class RaDecRoll: + """ + Data class for equatorial coordinates defined by (RA, Dec, Roll). This + makes it easier for interfacing and convert between radians and degrees. + + The set methods allow values to be float or None but internally, None will + be stored as np.nan so that the type is consistent. the get methods will + return None if the value is np.nan. + + NOTE: All angles are in radians. + """ + + ra: float = np.nan + dec: float = np.nan + roll: float = np.nan + is_set = False + + def reset(self): + """Reset to unset state""" + self.ra = np.nan + self.dec = np.nan + self.roll = np.nan + self.is_set = False + + def set( + self, ra: Union[float, None], dec: Union[float, None], roll: Union[float, None] + ): + """Set using radians""" + self.ra = ra if ra is not None else np.nan + self.dec = dec if dec is not None else np.nan + self.roll = roll if roll is not None else np.nan + self.is_set = True + + def set_from_deg( + self, + ra_deg: Union[float, None], + dec_deg: Union[float, None], + roll_deg: Union[float, None], + ): + """Set using degrees""" + ra = np.deg2rad(ra_deg) if ra_deg is not None else np.nan + dec = np.deg2rad(dec_deg) if dec_deg is not None else np.nan + roll = np.deg2rad(roll_deg) if roll_deg is not None else np.nan + + self.set(ra, dec, roll) + + def set_from_quaternion(self, q_eq: quaternion.quaternion): + """ + Set from a quaternion rotation relative to the Equatorial frame. + Re-using code from quaternion_transforms.q_eq2radec. + """ + ra, dec, roll = qt.q_eq2radec(q_eq) + self.set(ra, dec, roll) + + def get( + self, use_none=False + ) -> tuple[Union[float, None], Union[float, None], Union[float, None]]: + """ + Returns (ra, dec, roll) in radians. If use_none is True, returns None + for any unset (nan) values. + """ + if use_none: + ra = self.ra if not np.isnan(self.ra) else None + dec = self.dec if not np.isnan(self.dec) else None + roll = self.roll if not np.isnan(self.roll) else None + else: + ra, dec, roll = self.ra, self.dec, self.roll + + return ra, dec, roll + + def get_deg( + self, use_none=False + ) -> tuple[Union[float, None], Union[float, None], Union[float, None]]: + """ + Returns (ra, dec, roll) in degrees. If use_none is True, returns None + for any unset (nan) values. + """ + if use_none: + ra = np.rad2deg(self.ra) if not np.isnan(self.ra) else None + dec = np.rad2deg(self.dec) if not np.isnan(self.dec) else None + roll = np.rad2deg(self.roll) if not np.isnan(self.roll) else None + else: + ra, dec, roll = ( + np.rad2deg(self.ra), + np.rad2deg(self.dec), + np.rad2deg(self.roll), + ) + + return ra, dec, roll + + +def initialized_solved_dict() -> dict: + """ + Returns an initialized 'solved' dictionary with cooridnate and other + information. + + TODO: Update solver_main.py with this + TODO: use RaDecRoll class for the RA, Dec, Roll coordinates here? + TODO: "Alt" and "Az" could be removed but seems to be required by catalogs? + """ + solved = { + # RA, Dec, Roll [deg] of the scope at the target pixel + "RA": None, + "Dec": None, + "Roll": None, + # RA, Dec, Roll [deg] solved at the center of the camera FoV + # update by the IMU in the integrator + "camera_center": { + "RA": None, + "Dec": None, + "Roll": None, + "Alt": None, # NOTE: Altaz needed by catalogs for altaz mounts + "Az": None, + }, + # RA, Dec, Roll [deg] from the camera, not updated by IMU in integrator + "camera_solve": { + "RA": None, + "Dec": None, + "Roll": None, + }, + "imu_quat": None, # IMU quaternion as numpy quaternion (scalar-first) + # Alt, Az [deg] of scope: + "Alt": None, + "Az": None, + # Diagnostics: + "solve_source": None, + "solve_time": None, + "cam_solve_time": 0, + "last_solve_attempt": 0, # Timestamp of last solve attempt - tracks exposure_end of last processed image + "last_solve_success": None, # Timestamp of last successful solve + "constellation": None, + } + + return solved diff --git a/python/PiFinder/pointing_model/docs/PiFinder_Flat_bare_PCB_camera_coords.jpg b/python/PiFinder/pointing_model/docs/PiFinder_Flat_bare_PCB_camera_coords.jpg new file mode 100644 index 000000000..7aac61044 Binary files /dev/null and b/python/PiFinder/pointing_model/docs/PiFinder_Flat_bare_PCB_camera_coords.jpg differ diff --git a/python/PiFinder/pointing_model/imu_dead_reckoning.py b/python/PiFinder/pointing_model/imu_dead_reckoning.py new file mode 100644 index 000000000..382df5051 --- /dev/null +++ b/python/PiFinder/pointing_model/imu_dead_reckoning.py @@ -0,0 +1,237 @@ +""" +IMU dead-reckoning extrapolates the scope pointing from the last plate-solved +coordinate using the IMU measurements. + +See quaternion_transforms.py for conventions used for quaternions. + +NOTE: All angles are in radians. +""" + +import numpy as np +import quaternion + +from PiFinder.pointing_model.astro_coords import RaDecRoll +import PiFinder.pointing_model.quaternion_transforms as qt + + +class ImuDeadReckoning: + """ + Use the plate-solved coordinates and IMU measurements to estimate the + pointing using plate solving when available or dead-reckoning using the IMU + when plate solving isn't available (e.g. when the scope is moving or + between frames). + + For an explanation of the theory and conventions used, see + PiFinder/pointing_model/README.md. + + This class uses the Equatorial frame as the reference and expects + plate-solved coordinates in (ra, dec). + + All angles are in radians. None is not allowed as inputs (use np.nan). + + EXAMPLE: + # Set up: + imu_dead_reckoning = ImuDeadReckoning('flat') + imu_dead_reckoning.set_alignment(solved_cam, solved_scope) + + # Update with plate solved and IMU data: + imu_dead_reckoning.update_plate_solve_and_imu(solved_cam, q_x2imu) + + # Dead-reckoning using IMU + imu_dead_reckoning.update_imu(q_x2imu) + """ + + # Alignment: + q_cam2scope: quaternion.quaternion + # IMU orientation: + q_imu2cam: quaternion.quaternion + q_cam2imu: quaternion.quaternion + # Putting them together: + q_imu2scope: quaternion.quaternion + + # The poinging of the camera and scope frames wrt the Equatorial frame. + # These get updated by plate solving and IMU dead-reckoning. + q_eq2cam: quaternion.quaternion + + # True when q_eq2cam is estimated by IMU dead-reckoning. + # False when set by plate solving + dead_reckoning: bool = False + tracking: bool = False # True when previous plate solve exists and is tracking + + # The IMU's unkonwn drifting reference frame X. This is solved for + # every time we have a simultaneous plate solve and IMU measurement. + q_eq2x: quaternion.quaternion = quaternion.quaternion(np.nan) # nan means not set + + def __init__(self, screen_direction: str): + """ """ + # IMU-to-camera orientation. Fixed by PiFinder type + self._set_screen_direction(screen_direction) + + def set_alignment(self, solved_cam: RaDecRoll, solved_scope: RaDecRoll): + """ + Set the alignment between the PiFinder camera center and the scope + pointing. + + INPUTS: + solved_cam: Equatorial coordinate of the camera center at alignment. + solved_scope: Equatorial coordinate of the scope center at alignement. + """ + # Calculate q_scope2cam (alignment) + q_eq2cam = qt.radec2q_eq(solved_cam.ra, solved_cam.dec, solved_cam.roll) + q_eq2scope = qt.radec2q_eq(solved_scope.ra, solved_scope.dec, solved_scope.roll) + q_scope2cam = q_eq2scope.conjugate() * q_eq2cam + + # Set the alignmen attributes: + self.q_cam2scope = q_scope2cam.normalized().conj() + self.q_imu2scope = self.q_imu2cam * self.q_cam2scope + + def update_plate_solve_and_imu( + self, + solved_cam: RaDecRoll, + q_x2imu: quaternion.quaternion, + ): + """ + Update the state with the az/alt measurements from plate solving in the + camera frame. If the IMU measurement (which should be taken at the same + time) is available, q_x2imu (the unknown drifting reference frame) will + be solved for. + + INPUTS: + solved_cam: RA/Dec/Roll of the camera pointing from plate solving. + q_x2imu: [quaternion] Raw IMU measurement quaternions. This is the IMU + frame orientation wrt unknown drifting reference frame X. + """ + if not solved_cam.is_set: + return # No update + + # Update plate-solved coord: Camera frame relative to the Equatorial + # frame where the +y camera frame (i.e. "up") points to the North + # Celestial Pole (NCP) -- i.e. zero roll offset: + self.q_eq2cam = qt.radec2q_eq(solved_cam.ra, solved_cam.dec, solved_cam.roll) + self.dead_reckoning = False # Using plate solve, no dead_reckoning + + # Update IMU: Calculate the IMU's unknown reference frame X using the + # plate solved coordinates and IMU measurements taken from the same + # time. If the IMU measurement isn't provided (i.e. None), the existing + # q_hor2x will continue to be used. + if not np.isnan(q_x2imu): + self.q_eq2x = self.q_eq2cam * self.q_cam2imu * q_x2imu.conj() + self.q_eq2x = self.q_eq2x.normalized() + self.tracking = True # We have a plate solve and IMU measurement + + def update_imu(self, q_x2imu: quaternion.quaternion): + """ + Update the state with the raw IMU measurement. Does a dead-reckoning + estimate of the camera and scope pointing. + + INPUTS: + q_x2imu: Quaternion of the IMU orientation w.r.t. an unknown and drifting + reference frame X used by the IMU. + """ + if not np.isnan(self.q_eq2x): + # Dead reckoning estimate by IMU if q_hor2x has been estimated by a + # previous plate solve. + self.q_eq2cam = self.q_eq2x * q_x2imu * self.q_imu2cam + self.q_eq2cam = self.q_eq2cam.normalized() + + self.q_eq2scope = self.q_eq2cam * self.q_cam2scope + self.q_eq2scope = self.q_eq2scope.normalized() + + self.dead_reckoning = True + + def get_cam_radec(self) -> RaDecRoll: + """ + Returns the (ra, dec, roll) of the camera centre and a Boolean + dead_reckoning to indicate if the estimate is from dead-reckoning + (True) or from plate solving (False). + """ + ra_dec_roll = RaDecRoll() + ra_dec_roll.set_from_quaternion(self.q_eq2cam) + + return ra_dec_roll + + def get_scope_radec(self) -> RaDecRoll: + """ + Returns the (ra, dec, roll) of the scope and a Boolean dead_reckoning + to indicate if the estimate is from dead-reckoning (True) or from plate + solving (False). + """ + ra_dec_roll = RaDecRoll() + ra_dec_roll.set_from_quaternion(self.q_eq2scope) + + return ra_dec_roll + + def reset(self): + """ + Resets the internal state. + """ + self.q_eq2x = None + self.tracking = False + + def _set_screen_direction(self, screen_direction: str): + """ + Sets the screen direction which determines the fixed orientation between + the IMU and camera (q_imu2cam). + """ + self.q_imu2cam = get_screen_direction_q_imu2cam(screen_direction) + self.q_cam2imu = self.q_imu2cam.conj() + + +def get_screen_direction_q_imu2cam(screen_direction: str) -> quaternion.quaternion: + """ + Returns the quaternion that rotates the IMU frame to the camera frame + based on the screen direction. + + INPUTS: + screen_direction: "flat" or "upright" + + RETURNS: + q_imu2cam: Quaternion that rotates the IMU frame to the camera frame. + """ + if screen_direction == "left": + # Left: + # Rotate 90° around x_imu so that z_imu' points along z_camera + q1 = qt.axis_angle2quat([1, 0, 0], np.pi / 2) + # Rotate 90° around z_imu' to align with the camera cooridnates + q2 = qt.axis_angle2quat([0, 0, 1], np.pi / 2) + q_imu2cam = (q1 * q2).normalized() + elif screen_direction == "right": + # Right: + # Rotate -90° around x_imu so that z_imu' points along z_camera + q1 = qt.axis_angle2quat([1, 0, 0], -np.pi / 2) + # Rotate 90° around z_imu' to align with the camera cooridnates + q2 = qt.axis_angle2quat([0, 0, 1], np.pi / 2) + q_imu2cam = (q1 * q2).normalized() + elif screen_direction == "straight": + # Straight: + # Rotate 180° around y_imu so that z_imu' points along z_camera + q1 = qt.axis_angle2quat([0, 1, 0], np.pi) + # Rotate -90° around z_imu' to align with the camera cooridnates + q2 = qt.axis_angle2quat([0, 0, 1], -np.pi / 2) + q_imu2cam = (q1 * q2).normalized() + elif screen_direction == "flat3": + # Flat v3: + # Camera is tilted 30° further down from the screen compared to Flat v2 + # Rotate -120° around y_imu so that z_imu' points along z_camera + q1 = qt.axis_angle2quat([0, 1, 0], -np.pi * 2 / 3) + # Rotate -90° around z_imu' to align with the camera cooridnates + q2 = qt.axis_angle2quat([0, 0, 1], -np.pi / 2) + q_imu2cam = (q1 * q2).normalized() + elif screen_direction == "flat": + # Flat v2: + # Rotate -90° around y_imu so that z_imu' points along z_camera + q1 = qt.axis_angle2quat([0, 1, 0], -np.pi / 2) + # Rotate -90° around z_imu' to align with the camera cooridnates + q2 = qt.axis_angle2quat([0, 0, 1], -np.pi / 2) + q_imu2cam = (q1 * q2).normalized() + elif screen_direction == "as_bloom": + # As Dream: + # Camera points back up from the screen + # NOTE: Need to check if the orientation of the camera is correct + # Rotate +90° around z_imu to align with the camera cooridnates + # (+y_cam is along -x_imu) + q_imu2cam = qt.axis_angle2quat([0, 0, 1], +np.pi / 2) + else: + raise ValueError(f"Unsupported screen_direction: {screen_direction}") + + return q_imu2cam diff --git a/python/PiFinder/pointing_model/quaternion_transforms.py b/python/PiFinder/pointing_model/quaternion_transforms.py new file mode 100644 index 000000000..87001f70e --- /dev/null +++ b/python/PiFinder/pointing_model/quaternion_transforms.py @@ -0,0 +1,119 @@ +""" +Quaternion transformations + +For quaternions, we use the notation `q_a2b`. This represents a quaternion that +rotates frame `a` to frame `b` using intrinsic rotation (by post-multiplying +the quaternions). This notation makes makes chains of intrinsic rotations +simple and clear. For example, this gives a quaternion `q_a2c` that rotates +from frame `a` to frame `c`: + +q_a2c = q_a2b * q_a2c + +NOTE: + +* All angles are in radians. +* The quaternions use numpy quaternions and are scalar-first. +* Some of the constant quaternion terms can be speeded up by not using +trigonometric functions. +* The methods do not normalize the quaternions because this incurs a small +computational overhead. Normalization should be done manually as and when +necessary. +""" + +import numpy as np +import quaternion + + +def axis_angle2quat(axis, theta: float) -> quaternion.quaternion: + """ + Convert from axis-angle representation to a quaternion + + INPUTS: + axis: (3,) Axis of rotation (doesn't need to be a unit vector) + angle: Angle of rotation [rad]. It uses the right-hand rule; positive + when for clock-wise rotation as viewed outwards along the axis vector. + You can also point the thumb of the right hand in the direction of the + axis vector. Positive turn in the direction that the fingers point. + """ + assert len(axis) == 3, "axis should be a list or numpy array of length 3." + # Define the vector part of the quaternion + v = np.array(axis) / np.linalg.norm(axis) * np.sin(theta / 2) + + return quaternion.quaternion(np.cos(theta / 2), v[0], v[1], v[2]) + + +def get_quat_angular_diff( + q1: quaternion.quaternion, q2: quaternion.quaternion +) -> float: + """ + Calculates the relative rotation between quaternions `q1` and `q2`. + Accounts for the double-cover property of quaternions so that if q1 and q2 + are close, you get small angle d_theta rather than something around 2 * np.pi. + """ + dq = q1.conj() * q2 + d_theta = 2 * np.arctan2( + np.linalg.norm(dq.vec), dq.w + ) # atan2 is more robust than using acos + + # Account for double cover where q2 = -q1 gives d_theta = 2 * pi + if d_theta > np.pi: + d_theta = 2 * np.pi - d_theta + + return d_theta # In radians + + +# ========== Equatorial frame functions ============================ + + +def radec2q_eq(ra_rad: float, dec_rad: float, roll_rad: float) -> quaternion.quaternion: + """ + Express the equatorial coordinates (RA, Dec, Roll) in radians + in a quaternion rotation the relative to the Equatorial frame. + + Equatorial frame: +z points to the NCP, +x points to the Vernal equinox + (RA = 0, Dec = 0), +y points to RA = 6h (+pi / 2), Dec = 0. + + Roll: Roll is the angle of north celestial pole (NP) relative to image up. + positive roll when NP s counter-clockwise from image up when looking + out towards the sky towards the East (zero if +y_cam points up along + # the great circle towards the NP). Alternatively, roll is positive when + rotated clockwise when looking from the sky to the camera. + """ + # Intrinsic rotation of q_ra followed by q_dec gives a quaternion rotation + # that points +z towards the boresight of the camera. +y to the left and + # +x down. + q_ra = axis_angle2quat([0, 0, 1], ra_rad) # Rotate frame around z (NCP) + q_dec = axis_angle2quat([0, 1, 0], np.pi / 2 - dec_rad) # Rotate around y' + + # Need to rotate this +90 degrees around z_cam so that +y_cam points up + # and +x_cam points to the left of the Camera frame. In addition, need to + # account for the roll offset of the camera. + q_roll = axis_angle2quat([0, 0, 1], np.pi / 2 - roll_rad) + + # Intrinsic rotation: + q_eq = (q_ra * q_dec * q_roll).normalized() + + return q_eq + + +def q_eq2radec(q_eq2frame: quaternion.quaternion) -> tuple[float, float, float]: + """ + Returns the (ra, dec, roll) angles of the quaterion rotation relative to + the equatorial frame. + """ + # Pure quaternion along camera boresight + pz_frame = q_eq2frame * quaternion.quaternion(0, 0, 0, 1) * q_eq2frame.conj() + # Calculate RA, Dec from the camera boresight: + dec = np.arcsin(pz_frame.z) + ra = np.arctan2(pz_frame.y, pz_frame.x) + + # Pure quaternion along y_cam which points to NCP when roll = 0 + py_cam = q_eq2frame * quaternion.quaternion(0, 0, 1, 0) * q_eq2frame.conj() + # Local East and North vectors (roll is the angle between py_cam and the north vector) + vec_east = np.array([-np.sin(ra), np.cos(ra), 0]) + vec_north = np.array( + [-np.sin(dec) * np.cos(ra), -np.sin(dec) * np.sin(ra), np.cos(dec)] + ) + roll = np.arctan2(np.dot(py_cam.vec, vec_east), np.dot(py_cam.vec, vec_north)) + + return ra, dec, roll # In radians diff --git a/python/PiFinder/solver.py b/python/PiFinder/solver.py index 4d8b2e21d..ee2ce141b 100644 --- a/python/PiFinder/solver.py +++ b/python/PiFinder/solver.py @@ -21,6 +21,7 @@ from PiFinder import state_utils from PiFinder import utils +from PiFinder.pointing_model.astro_coords import initialized_solved_dict from PiFinder.sqm import SQM as SQMCalculator from PiFinder.state import SQM as SQMState @@ -243,6 +244,7 @@ def solver( align_result_queue, camera_command_queue, is_debug=False, + max_imu_ang_during_exposure=1.0, # Max allowed turn during exp [degrees] ): MultiprocLogging.configurer(log_queue) logger.debug("Starting Solver") @@ -251,34 +253,9 @@ def solver( ) align_ra = 0 align_dec = 0 + # Dict of RA, Dec, etc. initialized to None: + solved = initialized_solved_dict() solution = {} - solved = { - # RA, Dec, Roll solved at the center of the camera FoV - # update by integrator - "camera_center": { - "RA": None, - "Dec": None, - "Roll": None, - "Alt": None, - "Az": None, - }, - # RA, Dec, Roll from the camera, not - # affected by IMU in integrator - "camera_solve": { - "RA": None, - "Dec": None, - "Roll": None, - }, - # RA, Dec, Roll at the target pixel - "RA": None, - "Dec": None, - "Roll": None, - "imu_pos": None, - "solve_time": None, - "cam_solve_time": 0, - "last_solve_attempt": 0, # Timestamp of last solve attempt - tracks exposure_end of last processed image - "last_solve_success": None, # Timestamp of last successful solve - } centroids = [] log_no_stars_found = True @@ -343,11 +320,12 @@ def solver( is_new_image = ( last_image_metadata["exposure_end"] > solved["last_solve_attempt"] ) - is_stationary = last_image_metadata["imu_delta"] < 1 + # Use configured max_imu_ang_during_exposure (degrees) + is_stationary = last_image_metadata["imu_delta"] < max_imu_ang_during_exposure if is_new_image and not is_stationary: logger.debug( - f"Skipping image - IMU delta {last_image_metadata['imu_delta']:.2f}° >= 1° (moving)" + f"Skipping image - IMU delta {last_image_metadata['imu_delta']:.2f}° >= {max_imu_ang_during_exposure}° (moving)" ) if is_new_image and is_stationary: @@ -451,19 +429,21 @@ def solver( solved["camera_center"]["Dec"] = solved["Dec"] solved["camera_center"]["Roll"] = solved["Roll"] - # RA, Dec, Roll at the center of the camera's not imu: + # RA, Dec, Roll at the camera center from plate-solve (no IMU compensation) solved["camera_solve"]["RA"] = solved["RA"] solved["camera_solve"]["Dec"] = solved["Dec"] solved["camera_solve"]["Roll"] = solved["Roll"] + # RA, Dec, Roll at the target pixel: + # Replace the camera center RA/Dec with the RA/Dec for the target pixel solved["RA"] = solved["RA_target"] solved["Dec"] = solved["Dec_target"] - if last_image_metadata["imu"]: - solved["imu_pos"] = last_image_metadata["imu"]["pos"] + + if last_image_metadata.get("imu"): solved["imu_quat"] = last_image_metadata["imu"]["quat"] else: - solved["imu_pos"] = None solved["imu_quat"] = None + solved["solve_time"] = time.time() solved["cam_solve_time"] = solved["solve_time"] # Mark successful solve - use same timestamp as last_solve_attempt for comparison @@ -529,5 +509,5 @@ def solver( logger.error( f"Active threads: {[t.name for t in threading.enumerate()]}" ) - except Exception as e: + except Exception: pass # Don't let diagnostic logging fail diff --git a/python/PiFinder/solver_main.py b/python/PiFinder/solver_main.py new file mode 100644 index 000000000..5964e0e11 --- /dev/null +++ b/python/PiFinder/solver_main.py @@ -0,0 +1,279 @@ +#!/usr/bin/python +# -*- coding:utf-8 -*- +""" +This module is the solver +* runs loop looking for new images +* tries to solve them +* If solved, emits solution into queue + +""" + +from PiFinder.multiproclogging import MultiprocLogging +import queue +import numpy as np +import time +import logging +import sys +from time import perf_counter as precision_timestamp +import os +import threading + +from PiFinder import state_utils +from PiFinder import utils + +sys.path.append(str(utils.tetra3_dir)) +import tetra3 +from tetra3 import cedar_detect_client + +logger = logging.getLogger("Solver") + + +def solver( + shared_state, + solver_queue, + camera_image, + console_queue, + log_queue, + align_command_queue, + align_result_queue, + is_debug=False, +): + MultiprocLogging.configurer(log_queue) + logger.debug("Starting Solver") + t3 = tetra3.Tetra3( + str(utils.cwd_dir / "PiFinder/tetra3/tetra3/data/default_database.npz") + ) + align_ra = 0 + align_dec = 0 + solved = { + # RA, Dec, Roll solved at the center of the camera FoV + # update by integrator + "camera_center": { + "RA": None, + "Dec": None, + "Roll": None, + "Alt": None, + "Az": None, + }, + # RA, Dec, Roll from the camera, not + # affected by IMU in integrator + "camera_solve": { + "RA": None, + "Dec": None, + "Roll": None, + }, + # RA, Dec, Roll at the target pixel + "RA": None, + "Dec": None, + "Roll": None, + "imu_pos": None, + "solve_time": None, + "cam_solve_time": 0, + "last_solve_attempt": 0, # Timestamp of last solve attempt - tracks exposure_end of last processed image + "last_solve_success": None, # Timestamp of last successful solve + } + + centroids = [] + log_no_stars_found = True + + while True: + logger.info("Starting Solver Loop") + # Start cedar detect server + try: + cedar_detect = cedar_detect_client.CedarDetectClient( + binary_path=str(utils.cwd_dir / "../bin/cedar-detect-server-") + + shared_state.arch() + ) + except FileNotFoundError as e: + logger.warning( + "Not using cedar_detect, as corresponding file '%s' could not be found", + e.filename, + ) + cedar_detect = None + except ValueError: + logger.exception("Not using cedar_detect") + cedar_detect = None + + try: + while True: + # Loop over any pending commands + # There may be more than one! + command = True + while command: + try: + command = align_command_queue.get(block=False) + print(f"the command is {command}") + except queue.Empty: + command = False + + if command is not False: + if command[0] == "align_on_radec": + logger.debug("Align Command Received") + # search image pixels to find the best match + # for this RA/DEC and set it as alignment pixel + align_ra = command[1] + align_dec = command[2] + + if command[0] == "align_cancel": + align_ra = 0 + align_dec = 0 + + state_utils.sleep_for_framerate(shared_state) + + # use the time the exposure started here to + # reject images started before the last solve + # which might be from the IMU + try: + last_image_metadata = shared_state.last_image_metadata() + except (BrokenPipeError, ConnectionResetError) as e: + logger.error(f"Lost connection to shared state manager: {e}") + + # Check if we should process this image + is_new_image = ( + last_image_metadata["exposure_end"] > solved["last_solve_attempt"] + ) + is_stationary = last_image_metadata["imu_delta"] < 1 + + if is_new_image and not is_stationary: + logger.debug( + f"Skipping image - IMU delta {last_image_metadata['imu_delta']:.2f}° >= 1° (moving)" + ) + + if is_new_image and is_stationary: + img = camera_image.copy() + img = img.convert(mode="L") + np_image = np.asarray(img, dtype=np.uint8) + + # Mark that we're attempting a solve - use image exposure_end timestamp + # This is more accurate than wall clock and ties the attempt to the actual image + solved["last_solve_attempt"] = last_image_metadata["exposure_end"] + + t0 = precision_timestamp() + if cedar_detect is None: + # Use old tetr3 centroider + centroids = tetra3.get_centroids_from_image(np_image) + else: + centroids = cedar_detect.extract_centroids( + np_image, sigma=8, max_size=10, use_binned=True + ) + t_extract = (precision_timestamp() - t0) * 1000 + logger.debug( + "File %s, extracted %d centroids in %.2fms" + % ("camera", len(centroids), t_extract) + ) + + if len(centroids) == 0: + if log_no_stars_found: + logger.info("No stars found, skipping (Logged only once)") + log_no_stars_found = False + # Clear solve results to mark solve as failed (otherwise old values persist) + solved["RA"] = None + solved["Dec"] = None + solved["Matches"] = 0 + else: + log_no_stars_found = True + _solver_args = {} + if align_ra != 0 and align_dec != 0: + _solver_args["target_sky_coord"] = [[align_ra, align_dec]] + + solution = t3.solve_from_centroids( + centroids, + (512, 512), + fov_estimate=12.0, + fov_max_error=4.0, + match_max_error=0.005, + # return_matches=True, + target_pixel=shared_state.solve_pixel(), + solve_timeout=1000, + **_solver_args, + ) + + if "matched_centroids" in solution: + # Don't clutter printed solution with these fields. + # del solution['matched_centroids'] + # del solution['matched_stars'] + del solution["matched_catID"] + del solution["pattern_centroids"] + del solution["epoch_equinox"] + del solution["epoch_proper_motion"] + del solution["cache_hit_fraction"] + + solved |= solution + + total_tetra_time = t_extract + solved["T_solve"] + if total_tetra_time > 1000: + console_queue.put(f"SLV: Long: {total_tetra_time}") + logger.warning("Long solver time: %i", total_tetra_time) + + if solved["RA"] is not None: + # RA, Dec, Roll at the center of the camera's FoV: + solved["camera_center"]["RA"] = solved["RA"] + solved["camera_center"]["Dec"] = solved["Dec"] + solved["camera_center"]["Roll"] = solved["Roll"] + + # RA, Dec, Roll at the center of the camera's not imu: + solved["camera_solve"]["RA"] = solved["RA"] + solved["camera_solve"]["Dec"] = solved["Dec"] + solved["camera_solve"]["Roll"] = solved["Roll"] + # RA, Dec, Roll at the target pixel: + solved["RA"] = solved["RA_target"] + solved["Dec"] = solved["Dec_target"] + if last_image_metadata["imu"]: + solved["imu_pos"] = last_image_metadata["imu"]["pos"] + solved["imu_quat"] = last_image_metadata["imu"]["quat"] + else: + solved["imu_pos"] = None + solved["imu_quat"] = None + solved["solve_time"] = time.time() + solved["cam_solve_time"] = solved["solve_time"] + # Mark successful solve - use same timestamp as last_solve_attempt for comparison + solved["last_solve_success"] = solved["last_solve_attempt"] + + logger.info( + f"Solve SUCCESS - {len(centroids)} centroids → " + f"{solved.get('Matches', 0)} matches, " + f"RMSE: {solved.get('RMSE', 0):.1f}px" + ) + + # See if we are waiting for alignment + if align_ra != 0 and align_dec != 0: + if solved.get("x_target") is not None: + align_target_pixel = ( + solved["y_target"], + solved["x_target"], + ) + logger.debug(f"Align {align_target_pixel=}") + align_result_queue.put( + ["aligned", align_target_pixel] + ) + align_ra = 0 + align_dec = 0 + solved["x_target"] = None + solved["y_target"] = None + else: + # Centroids found but solve failed - clear Matches + solved["Matches"] = 0 + logger.warning( + f"Solve FAILED - {len(centroids)} centroids detected but " + f"pattern match failed (FOV est: 12.0°, max err: 4.0°)" + ) + + # Always push to queue after every solve attempt (success or failure) + solver_queue.put(solved) + except EOFError as eof: + logger.error(f"Main process no longer running for solver: {eof}") + logger.exception(eof) # This logs the full stack trace + # Optionally log additional context + logger.error(f"Current solver state: {solved}") # If you have state info + except Exception as e: + logger.error(f"Exception in Solver: {e.__class__.__name__}: {str(e)}") + logger.exception(e) # Logs the full stack trace + # Log additional context that might be helpful + logger.error(f"Current process ID: {os.getpid()}") + logger.error(f"Current thread: {threading.current_thread().name}") + try: + logger.error( + f"Active threads: {[t.name for t in threading.enumerate()]}" + ) + except Exception as e: + pass # Don't let diagnostic logging fail diff --git a/python/PiFinder/solver_premerge.py b/python/PiFinder/solver_premerge.py new file mode 100644 index 000000000..026ccb8f7 --- /dev/null +++ b/python/PiFinder/solver_premerge.py @@ -0,0 +1,222 @@ +#!/usr/bin/python +# -*- coding:utf-8 -*- +""" +This module is the solver +* runs loop looking for new images +* tries to solve them +* If solved, emits solution into queue + +""" + +from PiFinder.multiproclogging import MultiprocLogging +import queue +import numpy as np +import time +import logging +import sys +from time import perf_counter as precision_timestamp +import os +import threading + +from PiFinder import state_utils +from PiFinder import utils +from PiFinder.pointing_model.astro_coords import initialized_solved_dict + +sys.path.append(str(utils.tetra3_dir)) +import tetra3 +from tetra3 import cedar_detect_client + +logger = logging.getLogger("Solver") + + +def solver( + shared_state, + solver_queue, + camera_image, + console_queue, + log_queue, + align_command_queue, + align_result_queue, + is_debug=False, + max_imu_ang_during_exposure=1.0, # Max allowed turn during exp [degrees] +): + MultiprocLogging.configurer(log_queue) + logger.debug("Starting Solver") + t3 = tetra3.Tetra3( + str(utils.cwd_dir / "PiFinder/tetra3/tetra3/data/default_database.npz") + ) + last_solve_time = 0 + align_ra = 0 + align_dec = 0 + # Dict of RA, Dec, etc. initialized to None: + solved = initialized_solved_dict() + + centroids = [] + log_no_stars_found = True + + while True: + logger.info("Starting Solver Loop") + # Start cedar detect server + try: + cedar_detect = cedar_detect_client.CedarDetectClient( + binary_path=str(utils.cwd_dir / "../bin/cedar-detect-server-") + + shared_state.arch() + ) + except FileNotFoundError as e: + logger.warning( + "Not using cedar_detect, as corresponding file '%s' could not be found", + e.filename, + ) + cedar_detect = None + except ValueError: + logger.exception("Not using cedar_detect") + cedar_detect = None + + try: + while True: + # Loop over any pending commands + # There may be more than one! + command = True + while command: + try: + command = align_command_queue.get(block=False) + print(f"the command is {command}") + except queue.Empty: + command = False + + if command is not False: + if command[0] == "align_on_radec": + logger.debug("Align Command Received") + # search image pixels to find the best match + # for this RA/DEC and set it as alignment pixel + align_ra = command[1] + align_dec = command[2] + + if command[0] == "align_cancel": + align_ra = 0 + align_dec = 0 + + state_utils.sleep_for_framerate(shared_state) + + # use the time the exposure started here to + # reject images started before the last solve + # which might be from the IMU + try: + last_image_metadata = shared_state.last_image_metadata() + except (BrokenPipeError, ConnectionResetError) as e: + logger.error(f"Lost connection to shared state manager: {e}") + if ( + last_image_metadata["exposure_end"] > (last_solve_time) + and last_image_metadata["imu_delta"] < max_imu_ang_during_exposure + ): + img = camera_image.copy() + img = img.convert(mode="L") + np_image = np.asarray(img, dtype=np.uint8) + + t0 = precision_timestamp() + if cedar_detect is None: + # Use old tetr3 centroider + centroids = tetra3.get_centroids_from_image(np_image) + else: + centroids = cedar_detect.extract_centroids( + np_image, sigma=8, max_size=10, use_binned=True + ) + t_extract = (precision_timestamp() - t0) * 1000 + logger.debug( + "File %s, extracted %d centroids in %.2fms" + % ("camera", len(centroids), t_extract) + ) + + if len(centroids) == 0: + if log_no_stars_found: + logger.info("No stars found, skipping (Logged only once)") + log_no_stars_found = False + continue + else: + log_no_stars_found = True + _solver_args = {} + if align_ra != 0 and align_dec != 0: + _solver_args["target_sky_coord"] = [[align_ra, align_dec]] + + solution = t3.solve_from_centroids( + centroids, + (512, 512), + fov_estimate=12.0, + fov_max_error=4.0, + match_max_error=0.005, + # return_matches=True, + target_pixel=shared_state.solve_pixel(), + solve_timeout=1000, + **_solver_args, + ) + + if "matched_centroids" in solution: + # Don't clutter printed solution with these fields. + # del solution['matched_centroids'] + # del solution['matched_stars'] + del solution["matched_catID"] + del solution["pattern_centroids"] + del solution["epoch_equinox"] + del solution["epoch_proper_motion"] + del solution["cache_hit_fraction"] + + solved |= solution + + total_tetra_time = t_extract + solved["T_solve"] + if total_tetra_time > 1000: + console_queue.put(f"SLV: Long: {total_tetra_time}") + logger.warning("Long solver time: %i", total_tetra_time) + + if solved["RA"] is not None: + # RA, Dec, Roll at the center of the camera's FoV: + solved["camera_center"]["RA"] = solved["RA"] + solved["camera_center"]["Dec"] = solved["Dec"] + solved["camera_center"]["Roll"] = solved["Roll"] + + # RA, Dec, Roll at the center of the camera's not imu: + solved["camera_solve"]["RA"] = solved["RA"] + solved["camera_solve"]["Dec"] = solved["Dec"] + solved["camera_solve"]["Roll"] = solved["Roll"] + # RA, Dec, Roll at the target pixel: + solved["RA"] = solved["RA_target"] + solved["Dec"] = solved["Dec_target"] + if last_image_metadata["imu"]: + solved["imu_quat"] = last_image_metadata["imu"]["quat"] + else: + solved["imu_quat"] = None + solved["solve_time"] = time.time() + solved["cam_solve_time"] = solved["solve_time"] + solver_queue.put(solved) + + # See if we are waiting for alignment + if align_ra != 0 and align_dec != 0: + if solved.get("x_target") is not None: + align_target_pixel = ( + solved["y_target"], + solved["x_target"], + ) + logger.debug(f"Align {align_target_pixel=}") + align_result_queue.put(["aligned", align_target_pixel]) + align_ra = 0 + align_dec = 0 + solved["x_target"] = None + solved["y_target"] = None + + last_solve_time = last_image_metadata["exposure_end"] + except EOFError as eof: + logger.error(f"Main process no longer running for solver: {eof}") + logger.exception(eof) # This logs the full stack trace + # Optionally log additional context + logger.error(f"Current solver state: {solved}") # If you have state info + except Exception as e: + logger.error(f"Exception in Solver: {e.__class__.__name__}: {str(e)}") + logger.exception(e) # Logs the full stack trace + # Log additional context that might be helpful + logger.error(f"Current process ID: {os.getpid()}") + logger.error(f"Current thread: {threading.current_thread().name}") + try: + logger.error( + f"Active threads: {[t.name for t in threading.enumerate()]}" + ) + except Exception as e: + pass # Don't let diagnostic logging fail diff --git a/python/PiFinder/state.py b/python/PiFinder/state.py index 2fc6fd027..c938d69f1 100644 --- a/python/PiFinder/state.py +++ b/python/PiFinder/state.py @@ -126,12 +126,12 @@ def __repr__(self): SharedStateObj( power_state=1, solve_state=True, - solution={'RA': 22.86683471463411, 'Dec': 15.347716050003328, 'imu_pos': [171.39798541261814, 202.7646132036331, 358.2794741322842], + solution={'RA': 22.86683471463411, 'Dec': 15.347716050003328, 'solve_time': 1695297930.5532792, 'cam_solve_time': 1695297930.5532837, 'Roll': 306.2951794424281, 'FOV': 10.200729425086111, 'RMSE': 21.995567413046142, 'Matches': 12, 'Prob': 6.987725483613384e-13, 'T_solve': 15.00384000246413, 'RA_target': 22.86683471463411, 'Dec_target': 15.347716050003328, 'T_extract': 75.79255499877036, 'Alt': None, 'Az': None, 'solve_source': 'CAM', 'constellation': 'Psc'}, - imu={'moving': False, 'move_start': 1695297928.69749, 'move_end': 1695297928.764207, 'pos': [171.39798541261814, 202.7646132036331, 358.2794741322842], - 'start_pos': [171.4009455613444, 202.76321535004726, 358.2587208386012], 'status': 3}, + imu={'moving': False, 'move_start': 1695297928.69749, 'move_end': 1695297928.764207, + 'status': 3}, location={'lat': 59.05139745, 'lon': 7.987654, 'altitude': 151.4, 'source': 'GPS', gps_lock': False, 'timezone': 'Europe/Stockholm', 'last_gps_lock': None}, datetime=None, screen=, @@ -251,7 +251,7 @@ def __init__(self): "exposure_end": 0, "exposure_time": 500000, # Default exposure time in microseconds (0.5s) "imu": None, - "imu_delta": 0, + "imu_delta": 0.0, # Angle between quaternion at start and end of exposure [deg] } self.__solution = None self.__sats = None diff --git a/python/PiFinder/ui/base.py b/python/PiFinder/ui/base.py index e09880ed8..d032a9416 100644 --- a/python/PiFinder/ui/base.py +++ b/python/PiFinder/ui/base.py @@ -284,7 +284,7 @@ def screen_update(self, title_bar=True, button_hints=True) -> None: (6, 1), _(self.title), font=self.fonts.bold.font, fill=fg ) imu = self.shared_state.imu() - moving = True if imu and imu["pos"] and imu["moving"] else False + moving = True if imu and imu["quat"] and imu["moving"] else False # GPS status if self.shared_state.altaz_ready(): diff --git a/python/PiFinder/ui/console.py b/python/PiFinder/ui/console.py index 2c3471ecf..2fc4a9d5c 100644 --- a/python/PiFinder/ui/console.py +++ b/python/PiFinder/ui/console.py @@ -135,7 +135,7 @@ def screen_update(self, title_bar=True, button_hints=True): ) self.draw.text((6, 1), self.title, font=self.fonts.bold.font, fill=fg) imu = self.shared_state.imu() - moving = True if imu and imu["pos"] and imu["moving"] else False + moving = True if imu and imu["quat"] and imu["moving"] else False # GPS status if self.shared_state.altaz_ready(): diff --git a/python/PiFinder/ui/status.py b/python/PiFinder/ui/status.py index 2f5324690..69aedc752 100644 --- a/python/PiFinder/ui/status.py +++ b/python/PiFinder/ui/status.py @@ -257,14 +257,15 @@ def update_status_dict(self): imu = self.shared_state.imu() if imu: - if imu["pos"] is not None: + if imu["quat"] is not None: if imu["moving"]: mtext = "Moving" else: mtext = "Static" self.status_dict["IMU"] = f"{mtext : >11}" + " " + str(imu["status"]) self.status_dict["IMU PS"] = ( - f"{imu['pos'][0] : >6.1f}/{imu['pos'][2] : >6.1f}" + "imu NA" + # f"{imu['quat'][0] : >6.1f}/{imu['quat'][2] : >6.1f}" ) location = self.shared_state.location() sats = self.shared_state.sats() diff --git a/python/pyproject.toml b/python/pyproject.toml index 5617b6ec3..fe871ce19 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -128,6 +128,7 @@ module = [ 'sklearn.*', 'PyHotKey.*', 'PiFinder.tetra3.*', + 'quaternion', 'tetra3.*', 'grpc', 'ceder_detect_pb2', diff --git a/python/requirements.txt b/python/requirements.txt index 5f92091ab..2fa71abcf 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -9,7 +9,8 @@ json5==0.9.25 luma.oled==3.12.0 luma.lcd==2.11.0 pillow==10.4.0 -numpy==1.26.2 +numpy==1.26.4 +numpy-quaternion==2023.0.4 pandas==1.5.3 pydeepskylog==1.3.2 pyjwt==2.8.0