From d8355892b64592547256401e95f32995eb5378bb Mon Sep 17 00:00:00 2001 From: Igor Kantor Date: Mon, 14 Jul 2025 20:01:41 -0400 Subject: [PATCH] initial multithread impl --- MULTITHREADED_IMPLEMENTATION_PLAN.md | 341 +++++++++++++++++++++++++++ src/main.rs | 162 ++----------- src/processor.rs | 162 +++++++++++++ 3 files changed, 522 insertions(+), 143 deletions(-) create mode 100644 MULTITHREADED_IMPLEMENTATION_PLAN.md create mode 100644 src/processor.rs diff --git a/MULTITHREADED_IMPLEMENTATION_PLAN.md b/MULTITHREADED_IMPLEMENTATION_PLAN.md new file mode 100644 index 0000000..4ac2ce2 --- /dev/null +++ b/MULTITHREADED_IMPLEMENTATION_PLAN.md @@ -0,0 +1,341 @@ +# Multithreaded DNS Server Implementation Plan + +## Overview +This document outlines the plan to transform the current single-threaded DNS server into a truly multithreaded, high-performance DNS resolver capable of handling thousands of concurrent queries. + +## Current Architecture Analysis + +**Current Bottleneck**: The main loop in `main.rs:75-211` processes DNS queries sequentially: +```rust +loop { + let (len, addr) = sock.recv_from(&mut buf).await?; // Blocks until packet received + // Process packet synchronously + // Send response +} +``` + +**Performance Impact**: Each query blocks the entire server until completion, limiting throughput to ~100 queries/sec. + +## 1. Core Architecture Changes + +### Main Loop Refactoring (`main.rs:75-211`) +**Objective**: Replace blocking loop with task spawning architecture + +**Current Implementation**: +- Sequential processing in main loop +- Each query waits for previous query completion +- Single point of failure + +**New Implementation**: +```rust +loop { + let (len, addr) = sock.recv_from(&mut buf).await?; + let packet_data = buf[..len].to_vec(); + let query_handle = query_actor_handle.clone(); + let sock_clone = sock.clone(); // Arc + + tokio::spawn(async move { + process_dns_query(packet_data, addr, query_handle, sock_clone).await; + }); +} +``` + +**Benefits**: +- Each incoming UDP packet spawns independent task +- Main loop becomes purely packet reception and task dispatch +- Concurrent processing of multiple queries + +### Task Isolation Strategy +- Create `process_dns_query()` function to handle individual queries +- Move packet decoding, resolution, and response logic into spawned tasks +- Ensure each task is self-contained and isolated + +## 2. Shared State Management + +### UdpSocket Sharing +**Challenge**: Multiple tasks need to send responses via same socket + +**Solution**: +- Wrap `UdpSocket` in `Arc` for thread-safe sharing +- Each spawned task gets cloned reference for response sending +- Tokio's UdpSocket is already thread-safe for concurrent operations + +### QueryActorHandle Management +**Current State**: Already implements `Clone` trait +**Strategy**: +- Safe to clone across tasks +- Consider implementing connection pooling for better throughput +- Each task gets independent handle to actor system + +### Thread Safety Considerations +- All shared components use tokio's async-safe primitives +- No additional synchronization required for basic implementation +- Actor pattern already provides message-passing concurrency + +## 3. Connection Pooling Strategy + +### Multiple QueryActor Instances +**Objective**: Eliminate single QueryActor bottleneck + +**Current Limitation**: Single QueryActor processes all DNS resolutions +**Solution**: Create pool of QueryActor instances + +```rust +struct QueryActorPool { + actors: Vec, + current: AtomicUsize, + size: usize, +} + +impl QueryActorPool { + fn new(pool_size: usize, resolver: Resolver) -> Self { + let actors = (0..pool_size) + .map(|_| QueryActorHandle::new(resolver.clone())) + .collect(); + + Self { + actors, + current: AtomicUsize::new(0), + size: pool_size, + } + } + + fn get_actor(&self) -> &QueryActorHandle { + let index = self.current.fetch_add(1, Ordering::Relaxed) % self.size; + &self.actors[index] + } +} +``` + +### Distribution Strategies +- **Round-robin**: Simple, even distribution +- **Least-busy**: More complex, better load balancing +- **Recommended**: Start with round-robin, optimize later + +### Pool Sizing +- **Recommended**: 10-50 actors depending on expected load +- **Consideration**: Each actor maintains resolver connection +- **Tuning**: Monitor connection pool utilization + +## 4. Resource Management + +### Task Limits +**Problem**: Unlimited task spawning can exhaust system resources + +**Solution**: Implement semaphore-based limiting +```rust +use tokio::sync::Semaphore; + +static QUERY_SEMAPHORE: Semaphore = Semaphore::const_new(1000); + +// In main loop: +let permit = QUERY_SEMAPHORE.acquire().await.unwrap(); +tokio::spawn(async move { + let _permit = permit; // Hold permit for task duration + process_dns_query(packet_data, addr, query_handle, sock_clone).await; +}); +``` + +**Recommended Limits**: +- Development: 100-500 concurrent queries +- Production: 1000-5000 concurrent queries +- Adjust based on available memory and CPU cores + +### Memory Management +**Optimizations**: +- Use `BytesMut` efficiently to avoid excessive allocations +- Consider buffer pooling for high-throughput scenarios +- Monitor memory usage per task + +**Buffer Pool Implementation** (Optional): +```rust +use tokio::sync::Mutex; + +struct BufferPool { + buffers: Mutex>>, + buffer_size: usize, +} +``` + +## 5. Error Handling & Resilience + +### Task Isolation +**Benefits**: +- Failed tasks don't crash entire server +- Graceful handling of malformed packets +- Individual query timeouts + +**Implementation**: +```rust +tokio::spawn(async move { + let result = tokio::time::timeout( + Duration::from_secs(30), + process_dns_query(packet_data, addr, query_handle, sock_clone) + ).await; + + match result { + Ok(_) => {}, // Success + Err(_) => tracing::warn!("Query timeout for {}", addr), + } +}); +``` + +### Graceful Degradation +**Circuit Breaker Pattern**: +- Track upstream DNS resolver failures +- Temporarily disable failing resolvers +- Fallback to alternative resolvers + +**Fallback Responses**: +- Return SERVFAIL for resolution failures +- Implement basic error response generation +- Log errors for monitoring + +### Timeout Management +**Query Timeouts**: 5-30 seconds per query +**Resolution Timeouts**: Configure in hickory-resolver +**Connection Timeouts**: Handle at actor level + +## 6. Performance Optimizations + +### Async I/O Efficiency +**Current**: Single-threaded UDP operations +**Optimization**: Leverage tokio's async I/O + +**Advanced Options**: +- Consider `io_uring` backend on Linux for maximum performance +- Batch response sending where possible +- Use vectored I/O for multiple responses + +### Future Enhancements +**Caching Layer**: +- Add Redis or in-memory cache for frequent queries +- TTL-based cache invalidation +- Cache hit ratio monitoring + +**Load Balancing**: +- Multiple upstream DNS servers +- Health checking and failover +- Geographic DNS routing + +## 7. Monitoring & Metrics + +### Concurrent Operations Tracking +**Key Metrics**: +- Active task count +- Query processing latency (p50, p95, p99) +- Throughput (queries per second) +- Upstream resolver health + +**Implementation**: +```rust +use std::sync::atomic::{AtomicU64, Ordering}; + +static ACTIVE_QUERIES: AtomicU64 = AtomicU64::new(0); +static TOTAL_QUERIES: AtomicU64 = AtomicU64::new(0); + +// In query processing: +ACTIVE_QUERIES.fetch_add(1, Ordering::Relaxed); +// ... process query ... +ACTIVE_QUERIES.fetch_sub(1, Ordering::Relaxed); +TOTAL_QUERIES.fetch_add(1, Ordering::Relaxed); +``` + +### Resource Monitoring +**System Metrics**: +- Memory usage per task +- Connection pool utilization +- Error rates by query type +- Response time distribution + +**Logging Strategy**: +- Structured logging with tracing +- Query-level tracing spans +- Performance metrics export + +## 8. Implementation Phases + +### Phase 1: Basic Concurrency (Critical) +**Priority**: Immediate +**Components**: +- Task spawning in main loop +- Shared UdpSocket with Arc +- Basic error handling +- Simple resource limits + +**Expected Improvement**: 10x throughput increase + +### Phase 2: Connection Pooling (Important) +**Priority**: Short-term +**Components**: +- QueryActor pool implementation +- Round-robin distribution +- Advanced resource management +- Timeout handling + +**Expected Improvement**: 5x additional throughput increase + +### Phase 3: Advanced Features (Enhancement) +**Priority**: Medium-term +**Components**: +- Caching layer +- Advanced monitoring +- Circuit breaker pattern +- Performance optimizations + +**Expected Improvement**: 2-3x additional throughput increase + +## 9. Testing Strategy + +### Load Testing +**Tools**: Use `dig` with scripts or specialized DNS load testing tools +**Scenarios**: +- Concurrent query burst testing +- Sustained load testing +- Failure scenario testing + +### Performance Benchmarks +**Baseline**: Current single-threaded performance +**Targets**: +- Phase 1: 1,000+ concurrent queries +- Phase 2: 5,000+ concurrent queries +- Phase 3: 10,000+ concurrent queries + +### Integration Testing +**Scenarios**: +- Multiple client connections +- Mixed query types (A, AAAA, CNAME) +- Upstream DNS server failures +- Network latency simulation + +## 10. Configuration + +### Environment Variables +```bash +DNS_SERVER_MAX_CONCURRENT_QUERIES=1000 +DNS_SERVER_ACTOR_POOL_SIZE=20 +DNS_SERVER_QUERY_TIMEOUT_SECS=30 +DNS_SERVER_ENABLE_CACHING=true +``` + +### Runtime Configuration +- Adjustable limits without restart +- Pool size modification +- Timeout adjustments +- Feature toggles + +## Expected Performance Outcomes + +**Current Performance**: ~100 queries/second (sequential) +**Phase 1 Target**: 1,000+ queries/second (10x improvement) +**Phase 2 Target**: 5,000+ queries/second (50x improvement) +**Phase 3 Target**: 10,000+ queries/second (100x improvement) + +**Resource Requirements**: +- Memory: 2-8 GB depending on concurrent load +- CPU: 2-8 cores for optimal performance +- Network: Gigabit connection recommended + +## Conclusion + +This implementation plan transforms the DNS server from a simple sequential processor to a high-performance, concurrent DNS resolver capable of handling enterprise-level query loads. The phased approach allows for incremental improvements while maintaining system stability and providing measurable performance gains at each stage. \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index c68e1c6..5a7bb4b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ mod cli; mod codec; mod errors; mod parsers; +mod processor; mod protocol; mod response_builder; @@ -9,22 +10,22 @@ mod actors; mod handlers; use crate::handlers::query_handler::QueryActorHandle; +use crate::processor::process_dns_query; use std::net::{Ipv4Addr, SocketAddr}; -use bytes::BytesMut; -use codec::DnsCodec; + use hickory_resolver::{ config::{NameServerConfig, ResolverConfig}, name_server::TokioConnectionProvider, proto::xfer::Protocol, Resolver, }; -use response_builder::DnsResponseBuilder; + use tokio::net::UdpSocket; -use tokio_util::codec::{Decoder, Encoder}; -use tracing::{debug, error, info, Level}; + +use tracing::{info, Level}; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -39,7 +40,8 @@ async fn main() -> anyhow::Result<()> { let args = cli::Args::parse_args(); - let sock = UdpSocket::bind("0.0.0.0:2053").await?; + use std::sync::Arc; + let sock = Arc::new(UdpSocket::bind("0.0.0.0:2053").await?); let resolver_ip_port = args.resolver().unwrap_or(SocketAddr::new( std::net::IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), @@ -66,147 +68,21 @@ async fn main() -> anyhow::Result<()> { // Create a new actor handle for the query actor. let query_actor_handle = QueryActorHandle::new(resolver.clone()); - // Create a new DNS codec instance. - let mut codec = DnsCodec::new(); - let mut buf = [0; 1024]; + let mut buf = [0; 1024]; // Buffer for incoming packets info!("DNS server listening on 0.0.0.0:2053"); loop { let (len, addr) = sock.recv_from(&mut buf).await?; - debug!("Received {} bytes from {}", len, addr); - - // Create a BytesMut from the received data - let mut bytes_mut = BytesMut::from(&buf[..len]); - - // Use the codec to decode the DNS packet - match codec.decode(&mut bytes_mut) { - Ok(Some(packet)) => { - debug!( - "Successfully decoded DNS packet from {}: {:?}", - addr, packet.header - ); - - debug!( - target: "dns_server::packet_details", - packet_id = packet.header.id, - query_response = if packet.header.qr { "Response" } else { "Query" }, - opcode = packet.header.opcode, // This can be mapped to a string if needed - // opcode = match packet.header.opcode { - // 0 => "QUERY", - // 1 => "IQUERY", - // 2 => "STATUS", - // _ => "RESERVED" - // }, - authoritative = packet.header.aa, - truncated = packet.header.tc, - recursion_desired = packet.header.rd, - recursion_available = packet.header.ra, - response_code = match packet.header.rcode { - 0 => "NOERROR", - 1 => "FORMERR", - 2 => "SERVFAIL", - 3 => "NXDOMAIN", - 4 => "NOTIMP", - 5 => "REFUSED", - _ => "UNKNOWN" - }, - question_count = packet.header.qdcount, - answer_count = packet.header.ancount, - authority_count = packet.header.nscount, - additional_count = packet.header.arcount, - "DNS packet header parsed successfully" - ); - - // Create a DNS response packet - // let response_packet = create_dns_response(packet); - - // Alternative using builder pattern (more flexible): - // let response_packet = response_builder.build_response(&packet); - // - // Or with custom settings and domain: - /* - NOTE: When using the fluent interface with ResponseBuilder, - we need to call at least one with_*_record() method (like with_a_record(), with_aaaa_record(), etc.) to add questions, - otherwise the builder falls back to using the original query's questions - */ - // let mut response_builder = DnsResponseBuilder::new().build_custom_response(&packet); - - // Create a new builder for each request (thread-safe) - let mut dns_response_builder = DnsResponseBuilder::new(); - - let response_builder_fluent = dns_response_builder - .build_custom_response(&packet) - // leave Packet Identifier (ID) intact - .with_qr(true) // Set QR bit to true for response - // Leave Opcode as is (same as request) - .with_authoritative(false) // Set AA bit to false (not authoritative) - // Leave TC bit as is (not truncated) - // Leave RD bit as is (recursion desired) - .with_recursion_available(false) - // Set RA bit to false (recursion not available) - .with_z(0); // Reserved bits set to 0 - // .with_rcode(0) // NOERROR - // NOTE: rcode is 0 (no error) if OPCODE is 0 (standard query) else 4 (not implemented) - // .with_an_answer("", Ipv4Addr::new(1, 1, 1, 1), 3600) - // .build(); - - // Iterate over the questions in the original packet - // and add them to the response packet - // debug!("Processing {} questions", packet.questions.len()); - let mut response_builder_chain = response_builder_fluent; - - for question in packet.questions.iter() { - // `resolve` now returns an Option> - if let Some(ip_addrs) = query_actor_handle.resolve(question.name.clone()).await - { - if ip_addrs.is_empty() { - error!("Could not resolve {}: No IPs found", &question.name); - } else { - // Iterate over all returned IP addresses and add them to the response - for ip_addr in ip_addrs { - info!("Resolved {} -> {}", &question.name, ip_addr); - response_builder_chain = response_builder_chain.with_an_answer( - &question.name, - ip_addr, // This is already an IpAddr - 60, - ); - } - } - } else { - error!("Could not resolve {}: Lookup failed", &question.name); - // Optionally, set the RCODE to NXDOMAIN or similar - } - } - - let response_packet = response_builder_chain.build(); - // Other examples (commented out): - // Direct domain response: response_builder.build_domain_response("example.com", packet.header.id); - // Multiple domains: response_builder.build_multi_domain_response(&["google.com", "github.com"], packet.header.id); - // Different record types: .with_aaaa_record("ipv6.google.com"), .with_cname_record("www.example.com"), etc. - - // Encode the response packet - let mut response_buf = BytesMut::new(); - match codec.encode(response_packet, &mut response_buf) { - Ok(()) => { - let response_len = sock.send_to(&response_buf, addr).await?; - info!("Sent DNS response ({} bytes) to {}", response_len, addr); - } - Err(e) => { - error!("Failed to encode DNS response for {}: {}", addr, e); - // Fallback to echoing original data - let response_len = sock.send_to(&buf[..len], addr).await?; - info!("Fallback: echoed {} bytes back to {}", response_len, addr); - } - } - } - Ok(None) => { - info!("Incomplete packet received from {}, ignoring", addr); - } - Err(e) => { - error!("Failed to decode DNS packet from {}: {}", addr, e); - // Continue processing other packets even if one fails - } - } + + let packet_data = buf[..len].to_vec(); + let sock_clone = Arc::clone(&sock); // Arc + let query_handle = query_actor_handle.clone(); // Clone the actor handle + // let sock_clone = sock.clone(); // Arc + + // Spawn a new task to process the DNS query + tokio::spawn(async move { + process_dns_query(packet_data, addr, query_handle, sock_clone).await; + }); } } diff --git a/src/processor.rs b/src/processor.rs new file mode 100644 index 0000000..97a3baa --- /dev/null +++ b/src/processor.rs @@ -0,0 +1,162 @@ +use bytes::BytesMut; +use std::{net::SocketAddr, sync::Arc}; +use tokio::net::UdpSocket; +use tokio_util::codec::{Decoder, Encoder}; +use tracing::{debug, error, info}; + +use crate::response_builder::DnsResponseBuilder; +use crate::{codec::DnsCodec, handlers::query_handler::QueryActorHandle}; + +// Process DNS query in an asynchronous manner +pub async fn process_dns_query( + packet_data: Vec, + addr: SocketAddr, + query_handle: QueryActorHandle, + sock: Arc, +) { + let mut buf = [0; 1024]; + // Create a BytesMut from the received data + let mut bytes_mut = BytesMut::from(&packet_data[..]); + + debug!("Received {} bytes from {}", packet_data.len(), addr); + + // Use the codec to decode the DNS packet + + // Create a new DNS codec instance. + let mut codec = DnsCodec::new(); + + // Use the codec to decode the DNS packet + match codec.decode(&mut bytes_mut) { + Ok(Some(packet)) => { + debug!( + "Successfully decoded DNS packet from {}: {:?}", + addr, packet.header + ); + + debug!( + target: "dns_server::packet_details", + packet_id = packet.header.id, + query_response = if packet.header.qr { "Response" } else { "Query" }, + opcode = packet.header.opcode, // This can be mapped to a string if needed + // opcode = match packet.header.opcode { + // 0 => "QUERY", + // 1 => "IQUERY", + // 2 => "STATUS", + // _ => "RESERVED" + // }, + authoritative = packet.header.aa, + truncated = packet.header.tc, + recursion_desired = packet.header.rd, + recursion_available = packet.header.ra, + response_code = match packet.header.rcode { + 0 => "NOERROR", + 1 => "FORMERR", + 2 => "SERVFAIL", + 3 => "NXDOMAIN", + 4 => "NOTIMP", + 5 => "REFUSED", + _ => "UNKNOWN" + }, + question_count = packet.header.qdcount, + answer_count = packet.header.ancount, + authority_count = packet.header.nscount, + additional_count = packet.header.arcount, + "DNS packet header parsed successfully" + ); + + // Create a DNS response packet + // let response_packet = create_dns_response(packet); + + // Alternative using builder pattern (more flexible): + // let response_packet = response_builder.build_response(&packet); + // + // Or with custom settings and domain: + /* + NOTE: When using the fluent interface with ResponseBuilder, + we need to call at least one with_*_record() method (like with_a_record(), with_aaaa_record(), etc.) to add questions, + otherwise the builder falls back to using the original query's questions + */ + // let mut response_builder = DnsResponseBuilder::new().build_custom_response(&packet); + + // Create a new builder for each request (thread-safe) + let mut dns_response_builder = DnsResponseBuilder::new(); + + let response_builder_fluent = dns_response_builder + .build_custom_response(&packet) + // leave Packet Identifier (ID) intact + .with_qr(true) // Set QR bit to true for response + // Leave Opcode as is (same as request) + .with_authoritative(false) // Set AA bit to false (not authoritative) + // Leave TC bit as is (not truncated) + // Leave RD bit as is (recursion desired) + .with_recursion_available(false) + // Set RA bit to false (recursion not available) + .with_z(0); // Reserved bits set to 0 + // .with_rcode(0) // NOERROR + // NOTE: rcode is 0 (no error) if OPCODE is 0 (standard query) else 4 (not implemented) + // .with_an_answer("", Ipv4Addr::new(1, 1, 1, 1), 3600) + // .build(); + + // Iterate over the questions in the original packet + // and add them to the response packet + // debug!("Processing {} questions", packet.questions.len()); + let mut response_builder_chain = response_builder_fluent; + + for question in packet.questions.iter() { + // `resolve` now returns an Option> + if let Some(ip_addrs) = query_handle.resolve(question.name.clone()).await { + if ip_addrs.is_empty() { + error!("Could not resolve {}: No IPs found", &question.name); + } else { + // Iterate over all returned IP addresses and add them to the response + for ip_addr in ip_addrs { + info!("Resolved {} -> {}", &question.name, ip_addr); + response_builder_chain = response_builder_chain.with_an_answer( + &question.name, + ip_addr, // This is already an IpAddr + 60, + ); + } + } + } else { + error!("Could not resolve {}: Lookup failed", &question.name); + // Optionally, set the RCODE to NXDOMAIN or similar + } + } + + let response_packet = response_builder_chain.build(); + // Other examples (commented out): + // Direct domain response: response_builder.build_domain_response("example.com", packet.header.id); + // Multiple domains: response_builder.build_multi_domain_response(&["google.com", "github.com"], packet.header.id); + // Different record types: .with_aaaa_record("ipv6.google.com"), .with_cname_record("www.example.com"), etc. + + // Encode the response packet + let mut response_buf = BytesMut::new(); + match codec.encode(response_packet, &mut response_buf) { + Ok(()) => { + let response_len = sock + .send_to(&response_buf, addr) + .await + .expect("Failed to send DNS response"); + info!("Sent DNS response ({} bytes) to {}", response_len, addr); + } + Err(e) => { + error!("Failed to encode DNS response for {}: {}", addr, e); + // Fallback to echoing original data + let response_len = sock + .send_to(&packet_data[..], addr) + .await + .expect("Failed to send DNS response"); + info!("Fallback: echoed {} bytes back to {}", response_len, addr); + } + } + } + Ok(None) => { + info!("Incomplete packet received from {}, ignoring", addr); + } + Err(e) => { + error!("Failed to decode DNS packet from {}: {}", addr, e); + // Continue processing other packets even if one fails + } + } +}