Skip to content

adding back pressure limits #3

@devfire

Description

@devfire

Updated Critical Analysis Report: Addressing Concurrency Issues in main.rs

Based on your feedback, I've incorporated a detailed suggestion for fixing the concurrency disaster in src/main.rs. The original analysis highlighted how the unbounded tokio::spawn for every incoming UDP packet creates a DoS vulnerability through resource exhaustion (e.g., spawning thousands of tasks on a flood of packets). Below, I reiterate the key problem and provide the best way to address it, including a proposed code refactor. This fix focuses on bounding concurrency, adding backpressure, and rate limiting to make the server resilient.

The Problem (Recap)

  • The main loop does sock.recv_from(&mut buf).await, then immediately tokio::spawn a new task for each packet.
  • No limits on task count, so an attacker can send rapid packets, spawning unlimited tasks, exhausting CPU/memory/handles.
  • No backpressure: if processing is slow (e.g., upstream resolution delays), tasks pile up.
  • No rate limiting: accepts all traffic indiscriminately.

This is a classic unbounded queue problem in async systems—hello, OOM kills and unresponsive servers.

Best Way to Address It

The optimal fix uses a bounded worker pool with Tokio's primitives for backpressure and concurrency limiting:

  • Use a bounded channel (e.g., mpsc::channel with a fixed capacity) to queue incoming packets. If the channel is full, drop or delay new packets (backpressure).
  • Spawn a fixed number of worker tasks (e.g., based on CPU cores) that pull from the channel and process queries. This bounds concurrency.
  • Add rate limiting using a library like governor or a simple token bucket to throttle incoming requests per IP or globally.
  • Why this is best: It's efficient, leverages Tokio's async ecosystem, prevents DoS without complex external tools, and scales well. Avoids global locks or semaphores for better throughput.

Dependencies to Add: Add governor = "0.6" and nonzero_ext = "0.3" to Cargo.toml for rate limiting (or implement a simple one if minimalism is key).

Proposed Code Changes:
Rewrite the main loop in src/main.rs to use a channel and workers. Here's a diff for clarity (apply via apply_diff tool if desired):

<<<<<<< SEARCH
:start_line:44
-------
    let mut buf = [0; 1024]; // Buffer for incoming packets

    info!("DNS server listening on 0.0.0.0:2053");

    loop {
        let (len, addr) = sock.recv_from(&mut buf).await?;

        let packet_data = buf[..len].to_vec();
        let sock_clone = Arc::clone(&sock); // Arc<UdpSocket>
        let query_handle = query_actor_handle.clone(); // Clone the actor handle
                                                       // let sock_clone = sock.clone(); // Arc<UdpSocket>

        // Spawn a new task to process the DNS query
        tokio::spawn(async move {
            process_dns_query(packet_data, addr, query_handle, sock_clone).await;
        });
    }
=======
    use tokio::sync::mpsc;
    use governor::{RateLimiter, clock::DefaultClock, state::InMemoryState, middleware::NoOpMiddleware};
    use std::num::NonZeroU32;

    // ... (rest of imports)

    // In main:
    let mut buf = [0; 4096]; // Increase buffer for EDNS support

    info!("DNS server listening on 0.0.0.0:2053");

    // Bound channel for backpressure (queue up to 100 packets)
    let (tx, mut rx) = mpsc::channel(100);

    // Rate limiter: 100 requests per second globally (adjust as needed)
    let limiter = Arc::new(RateLimiter::direct(governor::Quota::per_second(NonZeroU32::new(100).unwrap())));

    // Spawn fixed workers (e.g., num_cpus::get())
    let num_workers = num_cpus::get();
    for _ in 0..num_workers {
        let rx = rx.clone();
        let sock_clone = Arc::clone(&sock);
        let query_handle = query_actor_handle.clone();
        let limiter = Arc::clone(&limiter);

        tokio::spawn(async move {
            while let Some((packet_data, addr)) = rx.recv().await {
                // Apply rate limiting per worker
                if limiter.until_ready().await.is_ok() {
                    process_dns_query(packet_data, addr, query_handle.clone(), sock_clone.clone()).await;
                } else {
                    // Dropped due to rate limit
                    tracing::warn!("Rate limit exceeded for {}", addr);
                }
            }
        });
    }

    loop {
        let (len, addr) = sock.recv_from(&mut buf).await?;
        let packet_data = buf[..len].to_vec();

        // Send to channel; if full, apply backpressure (drop or wait)
        if tx.try_send((packet_data, addr)).is_err() {
            tracing::warn!("Queue full, dropping packet from {}", addr);
        }
    }
>>>>>>> REPLACE

Explanation of Fix:

  • Bounded Channel: Queues packets; if full (100), drops new ones, preventing unbounded growth.
  • Worker Pool: Fixed tasks (e.g., 8 on an 8-core machine) process in parallel, bounding concurrency.
  • Rate Limiting: Uses governor to throttle to 100 req/s; can be per-IP with a HashMap of limiters.
  • Buffer Size: Increased to 4096 for larger packets.
  • Benefits: Prevents DoS by limiting resources; adds backpressure to slow down floods. Easy to tune (change channel size or quota).
  • Trade-offs: Dropping packets on overload—fine for DNS (clients retry). For production, add metrics/logging.

This refactor transforms the naive loop into a robust, scalable design. If you want me to apply this diff directly, confirm, and I'll use the apply_diff tool.

With this addressed, the updated report maintains the original critiques but notes this as a critical fix to mitigate the DoS risk. The project still needs major overhauls elsewhere, but this plugs a huge hole.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions