Skip to content

ohadstoller/recording-parts-queue

Repository files navigation

Recording Parts Queue System

Overview

A TypeScript-based distributed recording processing pipeline simulator with queue management, concurrent processing, and ordered result merging.

Features

  • ✅ Asynchronous part processing
  • ✅ Out-of-order part arrival handling
  • ✅ Concurrent STT and FFMPEG processing
  • ✅ Ordered merge logic
  • ✅ Multiple session support
  • ✅ CLI tools for monitoring
  • ✅ Comprehensive logging
  • ✅ >70% test coverage

Prerequisites

  • Node.js 18+ or 20+
  • npm or yarn

Installation

git clone <repo-url>
cd recording-parts-queue
npm install
cp .env.example .env

Configuration

Edit .env file:

PORT=3000
MAX_CONCURRENT_PROCESSING=5
STT_MIN_DELAY_MS=5000
# ... see .env.example for all options

Usage

Start Server

npm run dev          # Development with auto-reload
npm start            # Production

Run Recorder Simulator

npm run recorder     # Single session
npm run recorder 3   # Three concurrent sessions

Monitor System

View Logs

npm run logs         # View all logs
npm run logs:follow  # Follow in real-time

Check Status

npm run status              # View all sessions
npm run status:detailed     # Detailed view with previews

View Output

npm run output              # View merged results
npm run output:data         # Text only
npm run output:audio        # Audio only

Architecture

Components

  • Express Server: HTTP API for receiving parts
  • Queue Service: p-queue for concurrent processing
  • STT Service: Simulated speech-to-text (5-15s delay)
  • FFMPEG Service: Simulated audio merging (5-15s delay)
  • Session Manager: In-memory session state tracking
  • Merge Logic: Ordered result merging
  • Recorder Simulator: Test data generator

Data Flow

  1. Recorder sends parts → Server
  2. Server validates → Queue
  3. Queue processes via STT + FFMPEG (parallel)
  4. Results buffered by part index
  5. Merge consecutive completed parts
  6. Write to files when complete

API Endpoints

POST /api/recording-parts

Submit a recording part for processing.

Request:

{
  "idRecord": "uuid",
  "partIndex": 0,
  "isLast": false
}

Response:

{
  "success": true,
  "message": "Part queued for processing",
  "data": {
    "idRecord": "uuid",
    "partIndex": 0,
    "queuePosition": 3
  }
}

GET /api/sessions

Get all recording sessions and their status.

Response:

{
  "success": true,
  "data": [
    {
      "idRecord": "uuid",
      "totalParts": 5,
      "processedParts": 5,
      "isComplete": true,
      "mergedText": "Part 0 text...\nPart 1 text...",
      "mergedAudio": "Part 0 audio...\nPart 1 audio...",
      "createdAt": "2025-10-05T10:00:00.000Z",
      "completedAt": "2025-10-05T10:03:45.000Z"
    }
  ]
}

GET /health

Health check endpoint.

Response:

{
  "status": "healthy",
  "uptime": 3600,
  "timestamp": "2025-10-05T10:00:00.000Z"
}

Testing

Run Tests

npm test                # Run all tests
npm run test:watch      # Watch mode
npm run test:coverage   # With coverage report

Test Coverage

Current coverage: >86% (branches, functions, lines, statements)

Project Structure

src/
├── types/           # TypeScript interfaces
├── utils/           # Logger, validation, helpers
├── services/        # STT, FFMPEG, Queue services
├── session/         # Session manager
├── server/          # Express app, routes, middleware
├── recorder/        # Simulator
└── cli/             # CLI commands

tests/
├── services/        # Unit tests
├── session/         # Session manager tests
└── integration/     # Full flow tests

How It Works

Ordered Merging

Parts may arrive out of order, but merging only happens when consecutive parts are ready:

  • Received: [0, 2, 3] → Merge [0]
  • Received: [1] → Merge [1, 2, 3]

Completion Detection

Session completes when:

  1. Total parts known (isLast received)
  2. All parts processed
  3. All parts merged (nextMergeIndex === totalParts)

Concurrent Processing

Each part goes through STT and FFMPEG processing simultaneously:

  • STT: Simulated speech-to-text processing
  • FFMPEG: Simulated audio merging
  • Both run in parallel for maximum efficiency

Queue Management

  • Uses p-queue for concurrency control
  • Configurable max concurrent processing (default: 5)
  • Non-blocking API responses
  • In-memory session state

CLI Commands

Available Commands

npm run help              # Show all available commands
npm run logs              # View application logs
npm run logs:follow       # Follow logs in real-time
npm run status            # View session status
npm run status:detailed   # Detailed session view
npm run output            # View merged output files
npm run output:data       # View text output only
npm run output:audio      # View audio output only
npm run clean             # Clean logs and output directories

Example Workflow

# Terminal 1: Start server
npm run dev

# Terminal 2: Send recordings
npm run recorder 2

# Terminal 3: Monitor processing
npm run logs:follow

# After completion
npm run status
npm run output

Troubleshooting

Server won't start:

  • Check port is not in use: lsof -i :3000
  • Verify .env file exists

Parts not processing:

  • Check logs: npm run logs:follow
  • Verify server is running
  • Check concurrency setting

Output files not created:

  • Wait for all parts to complete
  • Check OUTPUT_DIR permission
  • Verify session completed: npm run status

No sessions showing:

  • Ensure server is running: npm run dev
  • Check server URL in .env: SERVER_URL=http://localhost:3000
  • Verify API connectivity: curl http://localhost:3000/health

Development

Add New Feature

  1. Create feature branch
  2. Write tests first
  3. Implement feature
  4. Ensure tests pass and coverage maintained
  5. Update documentation

Code Style

  • TypeScript strict mode
  • No any types
  • Functions <50 lines
  • Files <300 lines
  • ESLint compliant

Environment Variables

See .env.example for all available configuration options:

  • Server settings (PORT, NODE_ENV)
  • Processing delays (STT_MIN_DELAY_MS, etc.)
  • Concurrency limits (MAX_CONCURRENT_PROCESSING)
  • Recorder simulation settings
  • Logging configuration
  • Output directory settings

System Requirements

Performance

  • Handles 10+ concurrent sessions
  • Up to 5 concurrent part processes
  • API response time <100ms (excluding processing)

Reliability

  • No data loss during processing
  • Idempotent handling for duplicate parts
  • Graceful error handling and recovery

Logging

  • Structured JSON logs via Pino
  • Configurable log levels
  • File and console output options

Technical Details

Key Algorithms

  • Fisher-Yates Shuffle: For out-of-order part simulation
  • Consecutive Merge: Only merge parts when sequence is complete
  • Completion Buffer: Track processed parts for ordered assembly

Dependencies

  • Express: Lightweight HTTP server
  • p-queue: In-memory queue with concurrency control
  • Pino: High-performance structured logging
  • Zod: Runtime type validation
  • UUID: Session identifier generation
  • Axios: HTTP client for recorder simulator

License

MIT


🎉 Recording Parts Queue System - Production Ready!

This system successfully demonstrates distributed recording processing with queue management, concurrent processing, and ordered result merging - all implemented in TypeScript with comprehensive testing and monitoring capabilities.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published