Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ def main():
def parse_ws_command(ws_cmd: str, serial_commands: Queue[list[str]], telemetry_commands: Queue[list[str]]) -> None:
"""Parses a websocket command and places it on the correct process queue (telemetry or serial)."""

# If the client id exists, then parse it out
# Example format with a client id: "__client_id__7ecc8f4e telemetry replay play july_12th"
client_id = ""
if ws_cmd.startswith("__client_id__"):
parts = ws_cmd.split(" ", 1)
client_id = parts[0].replace("__client_id__", "")
ws_cmd = parts[1] if len(parts) > 1 else ""

# Remove special characters
parsed_command = sub(r"[^\da-zA-Z_./\s-]+", "", ws_cmd).split(" ")

Expand All @@ -139,7 +147,9 @@ def parse_ws_command(ws_cmd: str, serial_commands: Queue[list[str]], telemetry_c
case "serial":
serial_commands.put(parsed_command[1:])
case "telemetry":
telemetry_commands.put(parsed_command[1:])
# Include client id as the first element for telemetry commands
# This is so it can send specific data to specific clients (replay data, anything else we want to add in the future)
telemetry_commands.put([client_id] + parsed_command[1:])
case "shutdown":
raise ShutdownException
case _:
Expand Down
21 changes: 21 additions & 0 deletions main_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import logging
from datetime import datetime
from pathlib import Path

from src.ground_station_v2.api import run_server

Path("logs").mkdir(exist_ok=True)

log_file = f"logs/ground_station_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"

logging.basicConfig(
level=logging.INFO,
handlers=[
logging.StreamHandler(),
logging.FileHandler(log_file),
],
)

if __name__ == "__main__":
run_server()

5 changes: 4 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ pyserial>=3.5
pytest>=7.4.3
PyYAML>=6.0.1
setuptools>=68.2.2
tornado>=6.2
tornado>=6.2
fastapi>=0.104.0
uvicorn[standard]>=0.24.0
websockets>=12.0
237 changes: 166 additions & 71 deletions src/ground_station/telemetry/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ def __init__(
self.mission_recording_file: TextIOWrapper[BufferedWriter] | None = None

# Replay System
self.replay: Process | None = None
self.replay_input: Queue[str] = mp.Queue() # type:ignore
self.replay_output: Queue[str] = mp.Queue() # type:ignore
self.replays: dict[str, Process] = {} # client_id -> Process
self.replay_inputs: dict[str, Queue[str]] = {} # client_id -> input queue
self.replay_outputs: dict[str, Queue[str]] = {} # client_id -> output queue
self.replay_telemetry_data: dict[str, TelemetryBuffer] = {} # client_id -> per-client telemetry buffer

# Handle program closing to ensure no orphan processes
signal(SIGTERM, shutdown_sequence) # type:ignore
Expand All @@ -99,9 +100,11 @@ def run(self):
try:
# Parse websocket command into an enum
commands: list[str] = self.telemetry_ws_commands.get()
# Extract client_id from the first element (may be empty string)
client_id = commands.pop(0) if commands else ""
command = wsc.parse(commands, wsc.WebsocketCommand)
parameters = commands # Remaining items in the commands list are parameters
self.execute_command(command, parameters)
self.execute_command(command, parameters, client_id)
except AttributeError as e:
logger.error(e)
except wsc.WebsocketCommandNotFound as e:
Expand All @@ -117,25 +120,43 @@ def run(self):
self.parse_serial_status(command=x[0], data=x[1])
self.update_websocket()

# Switch data queues between replay and radio depending on mission state
match self.status.mission.state:
case MissionState.RECORDED:
while not self.replay_output.empty():
self.process_transmission(self.replay_output.get())
self.update_websocket()
case _:
while not self.rn2483_radio_payloads.empty():
self.process_transmission(self.rn2483_radio_payloads.get())
self.update_websocket()

def update_websocket(self) -> None:
"""Updates the websocket with the latest packet using the JSON output process."""
# Process replay outputs for each active replay client
for client_id, replay_output in list(self.replay_outputs.items()):
# Safety check: skip if this client's replay was stopped during iteration
if client_id not in self.replay_outputs:
continue
while not replay_output.empty():
# Process replay data into per-client buffer, not shared buffer
self.process_transmission(replay_output.get(), replay_client_id=client_id)
# Send replay data only to the specific client
self.update_websocket(target=client_id)

# Process live radio data (send to all clients)
while not self.rn2483_radio_payloads.empty():
self.process_transmission(self.rn2483_radio_payloads.get())
# Send live data to all clients
self.update_websocket()

def update_websocket(self, target: str | None = None) -> None:
"""Updates the websocket with the latest packet using the JSON output process.

Args:
target: Optional client ID to send the message to. If None or empty, sends to all clients.
"""
# Treat empty string same as None for broadcasting
if target == "":
target = None

# Use per-client buffer if this is for a replay client, otherwise use shared buffer
telemetry_data = self.replay_telemetry_data.get(target, self.telemetry_data) if target else self.telemetry_data

websocket_response = {
"org": self.config.organization,
"rocket": self.config.rocket_name,
"version": self.version,
"status": dict(self.status),
"telemetry": self.telemetry_data.get(),
"telemetry": telemetry_data.get(),
"target": target,
}
self.telemetry_json_output.put(websocket_response)

Expand Down Expand Up @@ -164,8 +185,14 @@ def parse_serial_status(self, command: str, data: str) -> None:
case _:
return None

def execute_command(self, command: wsc.Enum, parameters: list[str]) -> None:
"""Executes the passed websocket command."""
def execute_command(self, command: wsc.Enum, parameters: list[str], client_id: str = "") -> None:
"""Executes the passed websocket command.

Args:
command: The parsed websocket command enum.
parameters: Additional parameters for the command.
client_id: The ID of the client that sent the command (for replay targeting).
"""

WSCommand = wsc.WebsocketCommand
match command:
Expand All @@ -176,21 +203,47 @@ def execute_command(self, command: wsc.Enum, parameters: list[str]) -> None:
case WSCommand.REPLAY.value.PLAY:
if not parameters:
raise ReplayPlaybackError
if not client_id:
logger.error("Replay commands require a valid client_id")
return
mission_name = " ".join(parameters)
try:
self.play_mission(mission_name)
self.play_mission(mission_name, client_id)
except MissionNotFoundError as e:
logger.error(e.message)
except ReplayPlaybackError as e:
logger.error(e.message)
# Update only the specific client for replay commands
self.update_websocket(target=client_id)
return # Don't broadcast to all clients
case WSCommand.REPLAY.value.PAUSE:
self.set_replay_speed(0.0)
if not client_id:
logger.error("Replay commands require a valid client_id")
return
self.set_replay_speed(0.0, client_id)
self.update_websocket(target=client_id)
return
case WSCommand.REPLAY.value.RESUME:
self.set_replay_speed(self.status.replay.last_played_speed)
if not client_id:
logger.error("Replay commands require a valid client_id")
return
self.set_replay_speed(self.status.replay.last_played_speed, client_id)
self.update_websocket(target=client_id)
return
case WSCommand.REPLAY.value.SPEED:
self.set_replay_speed(float(parameters[0]))
if not client_id:
logger.error("Replay commands require a valid client_id")
return
self.set_replay_speed(float(parameters[0]), client_id)
self.update_websocket(target=client_id)
return
case WSCommand.REPLAY.value.STOP:
self.stop_replay()
if not client_id:
logger.error("Replay commands require a valid client_id")
return
self.stop_replay(client_id)
self.update_websocket(target=client_id)
return

# Record commands
case WSCommand.RECORD.value.STOP:
Expand All @@ -209,8 +262,8 @@ def execute_command(self, command: wsc.Enum, parameters: list[str]) -> None:

self.update_websocket()

def set_replay_speed(self, speed: float):
"""Set the playback speed of the replay system."""
def set_replay_speed(self, speed: float, client_id: str):
"""Set the playback speed of the replay system for a specific client."""
try:
speed = 0.0 if float(speed) < 0 else float(speed)
except ValueError:
Expand All @@ -221,32 +274,63 @@ def set_replay_speed(self, speed: float):
self.status.replay.speed = speed

# Set replay status based on speed
# If mission is not recorded, replay should be in DNE state.
# If there are no active replays, replay should be in DNE state.
# if else, set to pause/playing based on speed
if self.status.mission.state != MissionState.RECORDED:
if not self.replays:
self.status.replay.state = ReplayState.DNE
elif speed == 0.0:
self.status.replay.state = ReplayState.PAUSED
self.replay_input.put(f"speed {speed}")
if client_id in self.replay_inputs:
self.replay_inputs[client_id].put(f"speed {speed}")
else:
self.status.replay.state = ReplayState.PLAYING
self.replay_input.put(f"speed {speed}")

def stop_replay(self) -> None:
"""Stops the replay."""

logger.info("REPLAY STOP")

if self.replay is not None:
self.replay.terminate()
self.replay = None

# Empty replay output
self.replay_output: Queue[str] = mp.Queue() # type:ignore
self.reset_data()

def play_mission(self, mission_name: str) -> None:
"""Plays the desired mission recording."""
if client_id in self.replay_inputs:
self.replay_inputs[client_id].put(f"speed {speed}")

logger.info(f"REPLAY SPEED {speed} for client {client_id}")

def stop_replay(self, client_id: str | None = None) -> None:
"""Stops the replay for a specific client, or all replays if client_id is None."""

if client_id:
# Stop specific client's replay
logger.info(f"REPLAY STOP for client {client_id}")
if client_id in self.replays:
self.replays[client_id].terminate()
del self.replays[client_id]
if client_id in self.replay_inputs:
# Flush and delete input queue
while not self.replay_inputs[client_id].empty():
self.replay_inputs[client_id].get()
del self.replay_inputs[client_id]
if client_id in self.replay_outputs:
# Flush and delete output queue to prevent old packets from being processed
while not self.replay_outputs[client_id].empty():
self.replay_outputs[client_id].get()
del self.replay_outputs[client_id]
if client_id in self.replay_telemetry_data:
# Clean up per-client telemetry buffer
del self.replay_telemetry_data[client_id]
else:
# Stop all replays
logger.info("REPLAY STOP ALL")
for replay_process in self.replays.values():
replay_process.terminate()
# Flush all queues
for queue in self.replay_inputs.values():
while not queue.empty():
queue.get()
for queue in self.replay_outputs.values():
while not queue.empty():
queue.get()
self.replays.clear()
self.replay_inputs.clear()
self.replay_outputs.clear()
self.replay_telemetry_data.clear()
self.reset_data()

def play_mission(self, mission_name: str, client_id: str) -> None:
"""Plays the desired mission recording for a specific client."""

# Ensure not doing anything silly
if self.status.mission.recording:
Expand All @@ -256,30 +340,33 @@ def play_mission(self, mission_name: str) -> None:
if mission_file not in self.status.replay.mission_files_list:
raise MissionNotFoundError(mission_name)

# Set output data to current mission
self.status.mission.name = mission_name

# We are not to record when replaying missions
self.status.mission.state = MissionState.RECORDED
self.status.mission.recording = False

# Replay system
if self.replay is None:
self.replay = Process(
target=TelemetryReplay(
self.replay_output,
self.replay_input,
self.status.replay.speed,
mission_file,
).run
)
self.replay.start()
# Stop any existing replay for this client
if client_id in self.replays:
self.stop_replay(client_id)

# Create new queues for this client's replay
self.replay_inputs[client_id] = mp.Queue() # type:ignore
self.replay_outputs[client_id] = mp.Queue() # type:ignore
# Create per-client telemetry buffer for this replay
self.replay_telemetry_data[client_id] = TelemetryBuffer(self.config.telemetry_buffer_size)

# Create and start replay process for this client
self.replays[client_id] = Process(
target=TelemetryReplay(
self.replay_outputs[client_id],
self.replay_inputs[client_id],
self.status.replay.speed,
mission_file,
).run
)
self.replays[client_id].start()

self.set_replay_speed(
speed=self.status.replay.last_played_speed if self.status.replay.last_played_speed > 0 else 1
speed=self.status.replay.last_played_speed if self.status.replay.last_played_speed > 0 else 1,
client_id=client_id,
)

logger.info(f"REPLAY {mission_name} PLAYING")
logger.info(f"REPLAY {mission_name} PLAYING for client {client_id}")

def start_recording(self, mission_name: str | None = None) -> None:
"""Starts recording the current mission. If no mission name is given, the recording epoch is used."""
Expand All @@ -302,8 +389,13 @@ def stop_recording(self) -> None:
logger.info("Recording stopped")
# TODO

def process_transmission(self, data: str) -> None:
"""Processes the incoming radio transmission data."""
def process_transmission(self, data: str, replay_client_id: str | None = None) -> None:
"""Processes the incoming radio transmission data.

Args:
data: The transmission data to process
replay_client_id: If provided, updates the per-client replay buffer instead of shared buffer
"""
# Always write data to file when recording, even if it can't be parsed correctly
if self.status.mission.recording and self.mission_recording_file:
logger.info(f"Recording: {data}")
Expand All @@ -313,8 +405,11 @@ def process_transmission(self, data: str) -> None:
# Parse the transmission, if result is not null, update telemetry data
parsed: ParsedTransmission | None = parse_rn2483_transmission(data, self.config)
if parsed and parsed.blocks:
# Updates the telemetry buffer with the latest block data and latest mission time
self.telemetry_data.add(parsed.blocks)
# Update the appropriate buffer: per-client for replay, shared for live data
if replay_client_id and replay_client_id in self.replay_telemetry_data:
self.replay_telemetry_data[replay_client_id].add(parsed.blocks)
else:
self.telemetry_data.add(parsed.blocks)
except Exception as e:
print(e)
logger.error(e)
Loading
Loading