Skip to content

Improving Concurrency in the Actor System #4

@devfire

Description

@devfire

Updated Critical Analysis Report: Improving Concurrency in the Actor System

Thanks for the feedback—I've expanded on section 7 with concrete, actionable suggestions to fix the toy-like actor system in src/actors/. The original critique stands: it's a single-threaded bottleneck pretending to be concurrent, with sequential processing via one mpsc channel, no parallelism, weak error handling (just logging and None), and no optimizations like caching or timeouts, leading to upstream hammering and poor scalability. Below, I detail the best ways to improve it, prioritized by impact. These changes would transform it into a proper concurrent resolver with resilience.

Key Improvements Suggested

  1. Introduce Parallelism with an Actor Pool:

    • Why? Currently, one actor processes all resolutions sequentially, creating a bottleneck for concurrent queries.
    • How: Spawn a pool of actors (e.g., 4-8, based on CPU cores) that share a single mpsc channel. Each actor pulls messages and resolves in parallel. Use tokio::sync::mpsc with cloning receivers for the pool.
    • Benefits: True concurrency; handles multiple resolutions simultaneously without blocking.
    • Potential Drawbacks: Increased memory from multiple resolvers; mitigate by sharing a single resolver if thread-safe (Hickory is async-safe).
  2. Add Caching to Avoid Repeated Upstream Queries:

    • Why? No caching means every identical query hits upstream, wasting bandwidth and time.
    • How: Integrate a TTL-based cache (e.g., add ttl-cache = "0.5" to Cargo.toml). Cache results with a short TTL (e.g., 60s) keyed by domain. Check cache before resolving.
    • Benefits: Reduces latency and load on upstream (e.g., 8.8.8.8 won't rate-limit you).
    • Implementation Tip: Use a Mutex<ttl_cache::TtlCache<String, Vec<IpAddr>>> shared across actors.
  3. Implement Timeouts and Retries:

    • Why? No timeouts mean hung resolutions block forever; no retries ignore transient failures.
    • How: Wrap resolver.lookup_ip in tokio::time::timeout (e.g., 5s). For retries, use a loop (up to 3 attempts) with exponential backoff. Configure Hickory's resolver with retry policies.
    • Benefits: Makes the system robust to network flakes; prevents indefinite hangs.
  4. Enhance Error Handling:

    • Why? Current handling is lazy—just log and return None, losing context.
    • How: Define a custom ResolveError enum with variants (e.g., Timeout, NetworkError, NoRecords). Propagate errors to callers for better logging/response codes (e.g., set RCODE=2 for SERVFAIL).
    • Benefits: Callers (like processor.rs) can react intelligently, e.g., fallback to another upstream.
  5. Optional: Switch to a Real Actor Framework:

    • Why? The custom mpsc setup is reinventing the wheel poorly.
    • How: Migrate to Actix (add actix = "0.13"). Define an Actix actor for resolution with a supervisor for restarts.
    • Benefits: Built-in supervision, easier scaling. But if keeping it lightweight, stick with Tokio enhancements.
    • When to Do This: If the project grows beyond simple DNS; otherwise, the above fixes suffice.

Proposed Code Changes

Here's a sample refactor for src/actors/query_actor.rs and src/handlers/query_handler.rs. Apply via apply_diff if you want (I'd suggest testing first).

For query_actor.rs (add parallelism and caching):

<<<<<<< SEARCH
:start_line:13
-------
pub struct QueryActor {
    // The receiver for incoming messages
    receiver: mpsc::Receiver<QueryActorMessage>,
    // The resolver used to resolve DNS queries
    resolver: Resolver<TokioConnectionProvider>,
}

impl QueryActor {
    // Constructor for the actor
    pub fn new(
        receiver: mpsc::Receiver<QueryActorMessage>,
        resolver: Resolver<TokioConnectionProvider>,
    ) -> Self {
        // Return a new actor with the given receiver and an empty key-value hash map
        Self { receiver, resolver }
    }

    // Run the actor
    pub async fn run(&mut self) {
        // Continuously receive messages and handle them
        while let Some(msg) = self.receiver.recv().await {
            self.handle_message(msg).await;
        }
    }

    // Handle a message
    async fn handle_message(&self, msg: QueryActorMessage) {
        match msg {
            QueryActorMessage::Resolve { name, respond_to } => {
                let lookup_result: Result<LookupIp, ResolveError> =
                    self.resolver.lookup_ip(&name).await;
                match lookup_result {
                    Ok(lookup) => {
                        // Collect all IP addresses (both IPv4 and IPv6) from the lookup.
                        // When you call resolver.lookup_ip(&name), the returned LookupIp type is not a simple collection of data.
                        // It's an iterator that is tied to the lifetime of the resolver and the name it was called with.
                        // We need to collect the IP addresses into a Vec<IpAddr>.
                        let ips: Vec<IpAddr> = lookup.iter().collect();

                        if !ips.is_empty() {
                            let _ = respond_to.send(Some(ips));
                        } else {
                            // If the lookup was successful but returned no IPs
                            let _ = respond_to.send(None);
                        }
                    }
                    Err(e) => {
                        error!("DNS lookup failed for {}: {}", name, e);
                        let _ = respond_to.send(None);
                    }
                }
            }
        }
    }
}
=======
use ttl_cache::TtlCache;
use std::time::Duration;
use tokio::time::{self, timeout};

// ... (existing imports)

pub struct QueryActor {
    receiver: mpsc::Receiver<QueryActorMessage>,
    resolver: Resolver<TokioConnectionProvider>,
    cache: tokio::sync::Mutex<TtlCache<String, Vec<IpAddr>>>, // Shared cache
}

impl QueryActor {
    pub fn new(receiver: mpsc::Receiver<QueryActorMessage>, resolver: Resolver<TokioConnectionProvider>) -> Self {
        Self {
            receiver,
            resolver,
            cache: tokio::sync::Mutex::new(TtlCache::new(1000)), // Cache up to 1000 entries
        }
    }

    pub async fn run(&mut self) {
        while let Some(msg) = self.receiver.recv().await {
            self.handle_message(msg).await;
        }
    }

    async fn handle_message(&self, msg: QueryActorMessage) {
        match msg {
            QueryActorMessage::Resolve { name, respond_to } => {
                // Check cache first
                let mut cache = self.cache.lock().await;
                if let Some(cached_ips) = cache.get(&name) {
                    let _ = respond_to.send(Some(cached_ips.clone()));
                    return;
                }

                // Resolve with timeout and retry
                let mut attempts = 0;
                let ips = loop {
                    attempts += 1;
                    match timeout(Duration::from_secs(5), self.resolver.lookup_ip(&name)).await {
                        Ok(Ok(lookup)) => {
                            let ips: Vec<IpAddr> = lookup.iter().collect();
                            if !ips.is_empty() {
                                cache.insert(name.clone(), ips.clone(), Duration::from_secs(60));
                                break Some(ips);
                            } else {
                                break None;
                            }
                        }
                        Ok(Err(e)) => {
                            error!("Lookup error for {} (attempt {}): {}", name, attempts, e);
                            if attempts >= 3 { break None; }
                            time::sleep(Duration::from_millis(100 * attempts as u64)).await; // Backoff
                        }
                        Err(_) => {
                            error!("Timeout for {} (attempt {})", name, attempts);
                            if attempts >= 3 { break None; }
                            time::sleep(Duration::from_millis(100 * attempts as u64)).await;
                        }
                    }
                };

                let _ = respond_to.send(ips);
            }
        }
    }
}
>>>>>>> REPLACE

For query_handler.rs (spawn pool):

<<<<<<< SEARCH
:start_line:17
-------
impl QueryActorHandle {
    pub fn new(resolver: Resolver<TokioConnectionProvider>) -> Self {
        let (sender, receiver) = mpsc::channel(8);
        let mut actor = QueryActor::new(receiver, resolver);
        tokio::spawn(async move { actor.run().await });

        Self { sender }
    }

    /// Resolves a DNS name to an IPv4 address.
    pub async fn resolve(&self, name: String) -> Option<Vec<IpAddr>> {
        let (send, recv) = oneshot::channel();
        let msg = QueryActorMessage::Resolve {
            name,
            respond_to: send,
        };

        // Ignore send errors. If this send fails, so does the
        // recv.await below. There's no reason to check the
        // failure twice.
        let _ = self.sender.send(msg).await;

        // this is going back once the msg comes back from the actor.
        // NOTE: we might get None back, i.e. no value for the given key.
        if let Some(ips) = recv.await.expect("Actor task has been killed") {
            Some(ips)
        } else {
            None
        }
    }
}
=======
impl QueryActorHandle {
    pub fn new(resolver: Resolver<TokioConnectionProvider>) -> Self {
        let (sender, receiver) = mpsc::channel(32); // Larger channel for pool
        let resolver = Arc::new(resolver); // Share resolver

        // Spawn pool of actors
        let num_actors = num_cpus::get() / 2; // e.g., half the cores for resolution
        for _ in 0..num_actors {
            let receiver = receiver.clone();
            let resolver = Arc::clone(&resolver);
            tokio::spawn(async move {
                let mut actor = QueryActor::new(receiver, resolver);
                actor.run().await;
            });
        }

        Self { sender }
    }

    pub async fn resolve(&self, name: String) -> Option<Vec<IpAddr>> {
        // ... (existing code, unchanged)
    }
}
>>>>>>> REPLACE

Additional Tips:

  • Testing: Add benchmarks for concurrent resolutions (e.g., via criterion) to verify improvements.
  • Monitoring: Integrate metrics (e.g., prometheus) to track cache hits, resolution times, and error rates.
  • Edge Cases: Handle cache invalidation on errors, and add per-domain locking if contention arises.

These changes make the actor system truly concurrent and efficient. If implemented, re-run your tests—the project would be less of a bottleneck. Let me know if you want me to apply these diffs or suggest more.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions