From 152511fd7582d423a598e26f252b6f8695b59de5 Mon Sep 17 00:00:00 2001 From: Andrew Hauck Date: Tue, 20 Aug 2024 10:44:20 -0700 Subject: [PATCH 1/7] Add support for binding to local port ranges and retrying on EADDRNOTAVAIL --- .bleep | 2 +- pingora-core/src/connectors/l4.rs | 206 +++++++++++++++++++++++++-- pingora-core/src/connectors/mod.rs | 6 +- pingora-core/src/protocols/l4/ext.rs | 98 ++++++++++--- pingora-core/src/upstreams/peer.rs | 12 +- 5 files changed, 286 insertions(+), 38 deletions(-) diff --git a/.bleep b/.bleep index ac4d7909..f403528f 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -28f94f2a402bbf66341bdac8fa670caf5b7311e9 \ No newline at end of file +90c70086397a4708a4dadfed6e6915ce6dc33481 \ No newline at end of file diff --git a/pingora-core/src/connectors/l4.rs b/pingora-core/src/connectors/l4.rs index 01620343..58bd2099 100644 --- a/pingora-core/src/connectors/l4.rs +++ b/pingora-core/src/connectors/l4.rs @@ -33,8 +33,59 @@ pub trait Connect: std::fmt::Debug { async fn connect(&self, addr: &SocketAddr) -> Result; } +/// Settings for binding on connect +#[derive(Clone, Debug, Default)] +pub struct BindTo { + // local ip address + pub addr: Option, + // port range + port_range: Option<(u16, u16)>, + // whether we fallback and try again on bind errors when a port range is set + fallback: bool, +} + +impl BindTo { + /// Sets the port range we will bind to where the first item in the tuple is the lower bound + /// and the second item is the upper bound. + /// + /// Note this bind option is only supported on Linux since 6.3, this is a no-op on other systems. + /// To reset the range, pass a `None` or `Some((0,0))`, more information can be found [here](https://man7.org/linux/man-pages/man7/ip.7.html) + pub fn set_port_range(&mut self, range: Option<(u16, u16)>) -> Result<()> { + if range.is_none() && self.port_range.is_none() { + // nothing to do + return Ok(()); + } + + match range { + // 0,0 is valid for resets + None | Some((0, 0)) => self.port_range = Some((0, 0)), + // set the port range if valid + Some((low, high)) if low > 0 && low < high => { + self.port_range = Some((low, high)); + } + _ => return Error::e_explain(SocketError, "invalid port range: {range}"), + } + Ok(()) + } + + /// Set whether we fallback on no address available if a port range is set + pub fn set_fallback(&mut self, fallback: bool) { + self.fallback = fallback + } + + /// Configured bind port range + pub fn port_range(&self) -> Option<(u16, u16)> { + self.port_range + } + + /// Whether we attempt to fallback on no address available + pub fn will_fallback(&self) -> bool { + self.fallback && self.port_range.is_some() + } +} + /// Establish a connection (l4) to the given peer using its settings and an optional bind address. -pub(crate) async fn connect

(peer: &P, bind_to: Option) -> Result +pub(crate) async fn connect

(peer: &P, bind_to: Option) -> Result where P: Peer + Send + Sync, { @@ -142,12 +193,8 @@ pub(crate) fn bind_to_random( peer: &P, v4_list: &[InetSocketAddr], v6_list: &[InetSocketAddr], -) -> Option { - let selected = peer.get_peer_options().and_then(|o| o.bind_to); - if selected.is_some() { - return selected; - } - +) -> Option { + // helper function for randomly picking address fn bind_to_ips(ips: &[InetSocketAddr]) -> Option { match ips.len() { 0 => None, @@ -159,13 +206,31 @@ pub(crate) fn bind_to_random( } } - match peer.address() { + let mut bind_to = peer.get_peer_options().and_then(|o| o.bind_to.clone()); + if bind_to.as_ref().map(|b| b.addr).is_some() { + // already have a bind address selected + return bind_to; + } + + let addr = match peer.address() { SocketAddr::Inet(sockaddr) => match sockaddr { InetSocketAddr::V4(_) => bind_to_ips(v4_list), InetSocketAddr::V6(_) => bind_to_ips(v6_list), }, SocketAddr::Unix(_) => None, + }; + + if addr.is_some() { + if let Some(bind_to) = bind_to.as_mut() { + bind_to.addr = addr; + } else { + bind_to = Some(BindTo { + addr, + ..Default::default() + }); + } } + bind_to } use crate::protocols::raw_connect; @@ -238,16 +303,25 @@ mod tests { #[tokio::test] async fn test_conn_error_addr_not_avail() { let peer = HttpPeer::new("127.0.0.1:121".to_string(), false, "".to_string()); - let new_session = connect(&peer, Some("192.0.2.2:0".parse().unwrap())).await; + let addr = "192.0.2.2:0".parse().ok(); + let bind_to = BindTo { + addr, + ..Default::default() + }; + let new_session = connect(&peer, Some(bind_to)).await; assert_eq!(new_session.unwrap_err().etype(), &InternalError) } #[tokio::test] async fn test_conn_error_other() { let peer = HttpPeer::new("240.0.0.1:80".to_string(), false, "".to_string()); // non localhost - + let addr = "127.0.0.1:0".parse().ok(); // create an error: cannot send from src addr: localhost to dst addr: a public IP - let new_session = connect(&peer, Some("127.0.0.1:0".parse().unwrap())).await; + let bind_to = BindTo { + addr, + ..Default::default() + }; + let new_session = connect(&peer, Some(bind_to)).await; let error = new_session.unwrap_err(); // XXX: some system will allow the socket to bind and connect without error, only to timeout assert!(error.etype() == &ConnectError || error.etype() == &ConnectTimedout) @@ -371,4 +445,114 @@ mod tests { assert_eq!(err.etype(), &ConnectionClosed); assert!(!err.retry()); } + + #[cfg(target_os = "linux")] + #[tokio::test(flavor = "multi_thread")] + async fn test_bind_to_port_range_on_connect() { + fn get_ip_local_port_range() -> (u16, u16) { + let path = "/proc/sys/net/ipv4/ip_local_port_range"; + let file = std::fs::read_to_string(path).unwrap(); + let mut parts = file.split_whitespace(); + ( + parts.next().unwrap().parse().unwrap(), + parts.next().unwrap().parse().unwrap(), + ) + } + + // one-off mock server + async fn mock_inet_connect_server() { + use tokio::net::TcpListener; + let listener = TcpListener::bind("127.0.0.1:10020").await.unwrap(); + if let Ok((mut stream, _addr)) = listener.accept().await { + stream.write_all(b"HTTP/1.1 200 OK\r\n\r\n").await.unwrap(); + // wait a bit so that the client can read + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + } + + fn in_port_range(session: Stream, lower: u16, upper: u16) -> bool { + let digest = session.get_socket_digest(); + let local_addr = digest + .as_ref() + .and_then(|s| s.local_addr()) + .unwrap() + .as_inet() + .unwrap(); + + // assert range + local_addr.port() >= lower && local_addr.port() <= upper + } + + tokio::spawn(async { + mock_inet_connect_server().await; + }); + // wait for the server to start + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // need to read /proc/sys/net/ipv4/ip_local_port_range for this test to work + // IP_LOCAL_PORT_RANGE clamp only works on ports in /proc/sys/net/ipv4/ip_local_port_range + let (low, _) = get_ip_local_port_range(); + let high = low + 1; + + let peer = HttpPeer::new("127.0.0.1:10020".to_string(), false, "".to_string()); + let mut bind_to = BindTo { + addr: "127.0.0.1:0".parse().ok(), + ..Default::default() + }; + bind_to.set_port_range(Some((low, high))).unwrap(); + + let session1 = connect(&peer, Some(bind_to.clone())).await.unwrap(); + assert!(in_port_range(session1, low, high)); + + // execute more connect() + let session2 = connect(&peer, Some(bind_to.clone())).await.unwrap(); + assert!(in_port_range(session2, low, high)); + let session3 = connect(&peer, Some(bind_to.clone())).await.unwrap(); + assert!(in_port_range(session3, low, high)); + + // disabled fallback, should be AddrNotAvailable error + let err = connect(&peer, Some(bind_to.clone())).await.unwrap_err(); + assert_eq!(err.etype(), &InternalError); + + // enable fallback, assert not in port range but successful + bind_to.set_fallback(true); + let session4 = connect(&peer, Some(bind_to.clone())).await.unwrap(); + assert!(!in_port_range(session4, low, high)); + + // works without bind IP, shift up to use new ports + let low = low + 2; + let high = low + 1; + let mut bind_to = BindTo::default(); + bind_to.set_port_range(Some((low, high))).unwrap(); + let session5 = connect(&peer, Some(bind_to.clone())).await.unwrap(); + assert!(in_port_range(session5, low, high)); + } + + #[test] + fn test_bind_to_port_ranges() { + let addr = "127.0.0.1:0".parse().ok(); + let mut bind_to = BindTo { + addr, + ..Default::default() + }; + + // None because the previous value was None + bind_to.set_port_range(None).unwrap(); + assert!(bind_to.port_range.is_none()); + + // zeroes are handled + bind_to.set_port_range(Some((0, 0))).unwrap(); + assert_eq!(bind_to.port_range, Some((0, 0))); + + // zeroes because the previous value was Some + bind_to.set_port_range(None).unwrap(); + assert_eq!(bind_to.port_range, Some((0, 0))); + + // low > high is error + assert!(bind_to.set_port_range(Some((2000, 1000))).is_err()); + + // low < high success + bind_to.set_port_range(Some((1000, 2000))).unwrap(); + assert_eq!(bind_to.port_range, Some((1000, 2000))); + } } diff --git a/pingora-core/src/connectors/mod.rs b/pingora-core/src/connectors/mod.rs index 2d4584c2..cbe299f5 100644 --- a/pingora-core/src/connectors/mod.rs +++ b/pingora-core/src/connectors/mod.rs @@ -24,8 +24,8 @@ use crate::server::configuration::ServerConf; use crate::tls::ssl::SslConnector; use crate::upstreams::peer::{Peer, ALPN}; -use l4::connect as l4_connect; pub use l4::Connect as L4Connect; +use l4::{connect as l4_connect, BindTo}; use log::{debug, error, warn}; use offload::OffloadRuntime; use parking_lot::RwLock; @@ -273,7 +273,7 @@ impl TransportConnector { // connection timeout if there is one async fn do_connect( peer: &P, - bind_to: Option, + bind_to: Option, alpn_override: Option, tls_ctx: &SslConnector, ) -> Result { @@ -296,7 +296,7 @@ async fn do_connect( // Perform the actual L4 and tls connection steps with no timeout async fn do_connect_inner( peer: &P, - bind_to: Option, + bind_to: Option, alpn_override: Option, tls_ctx: &SslConnector, ) -> Result { diff --git a/pingora-core/src/protocols/l4/ext.rs b/pingora-core/src/protocols/l4/ext.rs index 5123bdff..4b65f84f 100644 --- a/pingora-core/src/protocols/l4/ext.rs +++ b/pingora-core/src/protocols/l4/ext.rs @@ -27,6 +27,8 @@ use std::os::unix::io::{AsRawFd, RawFd}; use std::time::Duration; use tokio::net::{TcpSocket, TcpStream, UnixStream}; +use crate::connectors::l4::BindTo; + /// The (copy of) the kernel struct tcp_info returns #[repr(C)] #[derive(Copy, Clone, Debug)] @@ -160,9 +162,12 @@ fn cvt_linux_error(t: i32) -> io::Result { #[cfg(target_os = "linux")] fn ip_bind_addr_no_port(fd: RawFd, val: bool) -> io::Result<()> { - const IP_BIND_ADDRESS_NO_PORT: i32 = 24; - - set_opt(fd, libc::IPPROTO_IP, IP_BIND_ADDRESS_NO_PORT, val as c_int) + set_opt( + fd, + libc::IPPROTO_IP, + libc::IP_BIND_ADDRESS_NO_PORT, + val as c_int, + ) } #[cfg(not(target_os = "linux"))] @@ -170,6 +175,26 @@ fn ip_bind_addr_no_port(_fd: RawFd, _val: bool) -> io::Result<()> { Ok(()) } +/// IP_LOCAL_PORT_RANGE is only supported on Linux 6.3 and higher, +/// ip_local_port_range() is a no-op on unsupported versions. +/// See the [man page](https://man7.org/linux/man-pages/man7/ip.7.html) for more details. +#[cfg(target_os = "linux")] +fn ip_local_port_range(fd: RawFd, low: u16, high: u16) -> io::Result<()> { + const IP_LOCAL_PORT_RANGE: i32 = 51; + let range: u32 = (low as u32) | ((high as u32) << 16); + + let result = set_opt(fd, libc::IPPROTO_IP, IP_LOCAL_PORT_RANGE, range as c_int); + match result { + Err(e) if e.raw_os_error() != Some(libc::ENOPROTOOPT) => Err(e), + _ => Ok(()), // no error or ENOPROTOOPT + } +} + +#[cfg(not(target_os = "linux"))] +fn ip_local_port_range(_fd: RawFd, _low: u16, _high: u16) -> io::Result<()> { + Ok(()) +} + #[cfg(target_os = "linux")] fn set_so_keepalive(fd: RawFd, val: bool) -> io::Result<()> { set_opt(fd, libc::SOL_SOCKET, libc::SO_KEEPALIVE, val as c_int) @@ -310,14 +335,42 @@ pub fn get_socket_cookie(_fd: RawFd) -> io::Result { Ok(0) // SO_COOKIE is a Linux concept } -/// connect() to the given address while optionally binding to the specific source address. +/// connect() to the given address while optionally binding to the specific source address and port range. /// /// The `set_socket` callback can be used to tune the socket before `connect()` is called. /// +/// If a [`BindTo`] is set with a port range and fallback setting enabled this function will retry +/// on EADDRNOTAVAIL ignoring the port range. +/// /// `IP_BIND_ADDRESS_NO_PORT` is used. -pub(crate) async fn connect_with Result<()>>( +/// `IP_LOCAL_PORT_RANGE` is used if a port range is set on [`BindTo`]. +pub(crate) async fn connect_with Result<()> + Clone>( addr: &SocketAddr, - bind_to: Option<&SocketAddr>, + bind_to: Option<&BindTo>, + set_socket: F, +) -> Result { + if bind_to.as_ref().map_or(false, |b| b.will_fallback()) { + // if we see an EADDRNOTAVAIL error clear the port range and try again + let connect_result = inner_connect_with(addr, bind_to, set_socket.clone()).await; + if let Err(e) = connect_result.as_ref() { + if matches!(e.etype(), BindError) { + let mut new_bind_to = BindTo::default(); + new_bind_to.addr = bind_to.as_ref().and_then(|b| b.addr); + // reset the port range + new_bind_to.set_port_range(None).unwrap(); + return inner_connect_with(addr, Some(&new_bind_to), set_socket).await; + } + } + connect_result + } else { + // not retryable + inner_connect_with(addr, bind_to, set_socket).await + } +} + +async fn inner_connect_with Result<()>>( + addr: &SocketAddr, + bind_to: Option<&BindTo>, set_socket: F, ) -> Result { let socket = if addr.is_ipv4() { @@ -328,14 +381,23 @@ pub(crate) async fn connect_with Result<()>>( .or_err(SocketError, "failed to create socket")?; if cfg!(target_os = "linux") { - ip_bind_addr_no_port(socket.as_raw_fd(), true) - .or_err(SocketError, "failed to set socket opts")?; - - if let Some(baddr) = bind_to { - socket - .bind(*baddr) - .or_err_with(BindError, || format!("failed to bind to socket {}", *baddr))?; - }; + ip_bind_addr_no_port(socket.as_raw_fd(), true).or_err( + SocketError, + "failed to set socket opts IP_BIND_ADDRESS_NO_PORT", + )?; + + if let Some(bind_to) = bind_to { + if let Some((low, high)) = bind_to.port_range() { + ip_local_port_range(socket.as_raw_fd(), low, high) + .or_err(SocketError, "failed to set socket opts IP_LOCAL_PORT_RANGE")?; + } + + if let Some(baddr) = bind_to.addr { + socket + .bind(baddr) + .or_err_with(BindError, || format!("failed to bind to socket {}", baddr))?; + } + } } // TODO: add support for bind on other platforms @@ -349,8 +411,9 @@ pub(crate) async fn connect_with Result<()>>( /// connect() to the given address while optionally binding to the specific source address. /// -/// `IP_BIND_ADDRESS_NO_PORT` is used. -pub async fn connect(addr: &SocketAddr, bind_to: Option<&SocketAddr>) -> Result { +/// `IP_BIND_ADDRESS_NO_PORT` is used +/// `IP_LOCAL_PORT_RANGE` is used if a port range is set on [`BindTo`]. +pub async fn connect(addr: &SocketAddr, bind_to: Option<&BindTo>) -> Result { connect_with(addr, bind_to, |_| Ok(())).await } @@ -365,7 +428,8 @@ fn wrap_os_connect_error(e: std::io::Error, context: String) -> Box { match e.kind() { ErrorKind::ConnectionRefused => Error::because(ConnectRefused, context, e), ErrorKind::TimedOut => Error::because(ConnectTimedout, context, e), - ErrorKind::PermissionDenied | ErrorKind::AddrInUse | ErrorKind::AddrNotAvailable => { + ErrorKind::AddrNotAvailable => Error::because(BindError, context, e), + ErrorKind::PermissionDenied | ErrorKind::AddrInUse => { Error::because(InternalError, context, e) } _ => match e.raw_os_error() { diff --git a/pingora-core/src/upstreams/peer.rs b/pingora-core/src/upstreams/peer.rs index fad84712..832791d8 100644 --- a/pingora-core/src/upstreams/peer.rs +++ b/pingora-core/src/upstreams/peer.rs @@ -29,7 +29,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; -use crate::connectors::L4Connect; +use crate::connectors::{l4::BindTo, L4Connect}; use crate::protocols::l4::socket::SocketAddr; use crate::protocols::ConnFdReusable; use crate::protocols::TcpKeepalive; @@ -110,8 +110,8 @@ pub trait Peer: Display + Clone { None => None, } } - /// Which local source address this connection should be bind to. - fn bind_to(&self) -> Option<&InetSocketAddr> { + /// Information about the local source address this connection should be bound to. + fn bind_to(&self) -> Option<&BindTo> { match self.get_peer_options() { Some(opt) => opt.bind_to.as_ref(), None => None, @@ -243,7 +243,7 @@ impl Peer for BasicPeer { !self.sni.is_empty() } - fn bind_to(&self) -> Option<&InetSocketAddr> { + fn bind_to(&self) -> Option<&BindTo> { None } @@ -294,7 +294,7 @@ impl Scheme { /// See [`Peer`] for the meaning of the fields #[derive(Clone, Debug)] pub struct PeerOptions { - pub bind_to: Option, + pub bind_to: Option, pub connection_timeout: Option, pub total_connection_timeout: Option, pub read_timeout: Option, @@ -365,7 +365,7 @@ impl PeerOptions { impl Display for PeerOptions { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - if let Some(b) = self.bind_to { + if let Some(b) = self.bind_to.as_ref() { write!(f, "bind_to: {:?},", b)?; } if let Some(t) = self.connection_timeout { From 5ce15b5f68d34be06e891d01bb953470d7300847 Mon Sep 17 00:00:00 2001 From: Matthew Gumport Date: Mon, 26 Aug 2024 23:58:16 -0700 Subject: [PATCH 2/7] unset meta on cache miss If we have a cache miss, any meta in this object is invalid. Unset it so that we don't use it later. --- .bleep | 2 +- pingora-cache/src/lib.rs | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/.bleep b/.bleep index f403528f..b554c0bd 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -90c70086397a4708a4dadfed6e6915ce6dc33481 \ No newline at end of file +856e8cec6eaf3ad55fd83b85a85591bfd7dbe1dc \ No newline at end of file diff --git a/pingora-cache/src/lib.rs b/pingora-cache/src/lib.rs index d1346e50..cdcfcc99 100644 --- a/pingora-cache/src/lib.rs +++ b/pingora-cache/src/lib.rs @@ -499,6 +499,11 @@ impl HttpCache { // from Stale: waited for cache lock, then retried and found asset was gone CachePhase::CacheKey | CachePhase::Bypass | CachePhase::Stale => { self.phase = CachePhase::Miss; + // It's possible that we've set the meta on lookup and have come back around + // here after not being able to acquire the cache lock, and our item has since + // purged or expired. We should be sure that the meta is not set in this case + // as there shouldn't be a meta set for cache misses. + self.inner_mut().meta = None; self.inner_mut().traces.start_miss_span(); } _ => panic!("wrong phase {:?}", self.phase), From 96f90d7237844df26c13c5361e95d97a2a5a3358 Mon Sep 17 00:00:00 2001 From: Zero King Date: Sun, 18 Aug 2024 10:09:20 +0000 Subject: [PATCH 3/7] Replace non-breaking space with regular space Includes-commit: 0560e0ff05481465e926766ba1d6791c12a96944 Replicated-from: https://github.com/cloudflare/pingora/pull/356 --- .bleep | 2 +- pingora-core/src/upstreams/peer.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.bleep b/.bleep index b554c0bd..6bd7d8b7 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -856e8cec6eaf3ad55fd83b85a85591bfd7dbe1dc \ No newline at end of file +d4fa38fb7c9b12dd533b95a9d0352e8693dafe14 \ No newline at end of file diff --git a/pingora-core/src/upstreams/peer.rs b/pingora-core/src/upstreams/peer.rs index 832791d8..78eb762e 100644 --- a/pingora-core/src/upstreams/peer.rs +++ b/pingora-core/src/upstreams/peer.rs @@ -67,7 +67,7 @@ pub trait Peer: Display + Clone { fn tls(&self) -> bool; /// The SNI to send, if TLS is used fn sni(&self) -> &str; - /// To decide whether a [`Peer`] can use the connection established by another [`Peer`]. + /// To decide whether a [`Peer`] can use the connection established by another [`Peer`]. /// /// The connections to two peers are considered reusable to each other if their reuse hashes are /// the same From 4e813b1b77847274888febf832a5a2c32f617d1b Mon Sep 17 00:00:00 2001 From: Yuchen Wu Date: Fri, 30 Aug 2024 16:22:43 -0700 Subject: [PATCH 4/7] docs: fix doc comment for request_summary Co-authored-by: Derek Argueta --- .bleep | 2 +- pingora-core/src/protocols/http/v1/server.rs | 2 +- pingora-core/src/protocols/http/v2/server.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.bleep b/.bleep index 6bd7d8b7..04181898 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -d4fa38fb7c9b12dd533b95a9d0352e8693dafe14 \ No newline at end of file +ed4f7c51958b688c6bea000c37dc2ba54f2ccc15 \ No newline at end of file diff --git a/pingora-core/src/protocols/http/v1/server.rs b/pingora-core/src/protocols/http/v1/server.rs index ea99d743..8dca9d49 100644 --- a/pingora-core/src/protocols/http/v1/server.rs +++ b/pingora-core/src/protocols/http/v1/server.rs @@ -319,7 +319,7 @@ impl HttpSession { .map_or(b"", |h| h.as_bytes()) } - /// Return a string `$METHOD $PATH $HOST`. Mostly for logging and debug purpose + /// Return a string `$METHOD $PATH, Host: $HOST`. Mostly for logging and debug purpose pub fn request_summary(&self) -> String { format!( "{} {}, Host: {}", diff --git a/pingora-core/src/protocols/http/v2/server.rs b/pingora-core/src/protocols/http/v2/server.rs index 447a4e30..6d6b393b 100644 --- a/pingora-core/src/protocols/http/v2/server.rs +++ b/pingora-core/src/protocols/http/v2/server.rs @@ -344,7 +344,7 @@ impl HttpSession { Ok(end_stream) } - /// Return a string `$METHOD $PATH $HOST`. Mostly for logging and debug purpose + /// Return a string `$METHOD $PATH, Host: $HOST`. Mostly for logging and debug purpose pub fn request_summary(&self) -> String { format!( "{} {}, Host: {}:{}", From afa6f5abcd8cfd89895580fb97b69f417acfd7e1 Mon Sep 17 00:00:00 2001 From: Allan <6740989+allan2@users.noreply.github.com> Date: Fri, 23 Aug 2024 15:24:38 +0000 Subject: [PATCH 5/7] Fix `Opt::parse_args` doc typo Includes-commit: 9a934bc8baa4e8b05639f6aa71ca9695832986f0 Replicated-from: https://github.com/cloudflare/pingora/pull/360 --- .bleep | 2 +- pingora-core/src/server/configuration/mod.rs | 2 +- pingora-core/src/server/mod.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.bleep b/.bleep index 04181898..59c17856 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -ed4f7c51958b688c6bea000c37dc2ba54f2ccc15 \ No newline at end of file +b0ee94e1ce384cb2d23a40bf6c7a4b7e32541412 \ No newline at end of file diff --git a/pingora-core/src/server/configuration/mod.rs b/pingora-core/src/server/configuration/mod.rs index 5174d407..21428aa6 100644 --- a/pingora-core/src/server/configuration/mod.rs +++ b/pingora-core/src/server/configuration/mod.rs @@ -118,7 +118,7 @@ impl Default for ServerConf { /// Command-line options /// -/// Call `Opt::from_args()` to build this object from the process's command line arguments. +/// Call `Opt::parse_args()` to build this object from the process's command line arguments. #[derive(Parser, Debug, Default)] #[clap(name = "basic", long_about = None)] pub struct Opt { diff --git a/pingora-core/src/server/mod.rs b/pingora-core/src/server/mod.rs index c3659f2a..d9e57ddc 100644 --- a/pingora-core/src/server/mod.rs +++ b/pingora-core/src/server/mod.rs @@ -201,7 +201,7 @@ impl Server { /// independent services. /// /// Command line options can either be passed by parsing the command line arguments via - /// `Opt::from_args()`, or be generated by other means. + /// `Opt::parse_args()`, or be generated by other means. pub fn new(opt: impl Into>) -> Result { let opt = opt.into(); let (tx, rx) = watch::channel(false); From 58c0da7d9a9920b588ac9dce357e4fbba04d85d7 Mon Sep 17 00:00:00 2001 From: Matthew Gumport Date: Tue, 27 Aug 2024 00:31:53 -0700 Subject: [PATCH 6/7] disable caching during error handling This has the effect of reducing a warning log since we are not leaving the lock dangling for the next reader. --- .bleep | 2 +- pingora-proxy/src/lib.rs | 2 ++ pingora-proxy/tests/test_upstream.rs | 3 --- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/.bleep b/.bleep index 59c17856..0f44214d 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -b0ee94e1ce384cb2d23a40bf6c7a4b7e32541412 \ No newline at end of file +30c624970d46b07efd110fcfb8dd0b6f9a099e2b \ No newline at end of file diff --git a/pingora-proxy/src/lib.rs b/pingora-proxy/src/lib.rs index f872d82d..890af982 100644 --- a/pingora-proxy/src/lib.rs +++ b/pingora-proxy/src/lib.rs @@ -607,6 +607,8 @@ impl HttpProxy { }; if let Some(e) = final_error.as_ref() { + // If we have errored and are still holding a cache lock, release it. + session.cache.disable(NoCacheReason::InternalError); let status = self.inner.fail_to_proxy(&mut session, e, &mut ctx).await; // final error will have > 0 status unless downstream connection is dead diff --git a/pingora-proxy/tests/test_upstream.rs b/pingora-proxy/tests/test_upstream.rs index c2499f59..02174c10 100644 --- a/pingora-proxy/tests/test_upstream.rs +++ b/pingora-proxy/tests/test_upstream.rs @@ -1077,9 +1077,6 @@ mod test_cache { init(); let url = "http://127.0.0.1:6148/sleep/test_cache_lock_network_error.txt"; - // FIXME: Dangling lock happens in this test because the first request aborted without - // properly release the lock. This is a bug - let task1 = tokio::spawn(async move { let res = reqwest::Client::new() .get(url) From 1e0e0bcae12a85aebaa209d608bea04a5b01ab23 Mon Sep 17 00:00:00 2001 From: Matthew Gumport Date: Wed, 4 Sep 2024 16:02:29 -0700 Subject: [PATCH 7/7] change lock status memory ordering, tag spans This changes the memory ordering for the lock status load to `SeqCst` from `Relaxed` to eliminate a potential source of panics. Panics had the frames: ``` pingora_proxy::proxy_cache::::handle_lock_status (proxy_cache.rs:748) pingora_proxy::proxy_cache::::proxy_cache::{{closure}} (proxy_cache.rs:211) pingora_proxy::HttpProxy::process_request::{{closure}} (lib.rs:509) pingora_proxy::HttpProxy::process_new_http::{{closure}} (lib.rs:727) ``` which showed we were checking on the status of the lock, after waiting on it, and still seeing its status as waiting. The status is returned by value, so this is not a time-of-check to time-of-use problem, this is an inconsistency in how the lock status is managed. The change in memory order is mostly for the sake of this programmer's attempts to understand what is happening. This also completes a couple of TODOs to limit the wait period as well as tag the span with the lock status. --- .bleep | 2 +- pingora-cache/Cargo.toml | 1 + pingora-cache/src/lib.rs | 8 ++++++-- pingora-cache/src/lock.rs | 27 ++++++++++++++++++++------- 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/.bleep b/.bleep index 0f44214d..5081a5be 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -30c624970d46b07efd110fcfb8dd0b6f9a099e2b \ No newline at end of file +2351cdf592f9986201d754e6ee1f37f493f69abb \ No newline at end of file diff --git a/pingora-cache/Cargo.toml b/pingora-cache/Cargo.toml index 2dc3f2f7..6ae6c123 100644 --- a/pingora-cache/Cargo.toml +++ b/pingora-cache/Cargo.toml @@ -43,6 +43,7 @@ lru = { workspace = true } ahash = { workspace = true } hex = "0.4" httparse = { workspace = true } +strum = { version = "0.26", features = ["derive"] } [dev-dependencies] tokio-test = "0.4" diff --git a/pingora-cache/src/lib.rs b/pingora-cache/src/lib.rs index cdcfcc99..f1f8bd21 100644 --- a/pingora-cache/src/lib.rs +++ b/pingora-cache/src/lib.rs @@ -21,6 +21,7 @@ use key::{CacheHashKey, HashBinary}; use lock::WritePermit; use pingora_error::Result; use pingora_http::ResponseHeader; +use rustracing::tag::Tag; use std::time::{Duration, Instant, SystemTime}; use trace::CacheTraceCTX; @@ -1047,7 +1048,7 @@ impl HttpCache { /// Check [Self::is_cache_locked()], panic if this request doesn't have a read lock. pub async fn cache_lock_wait(&mut self) -> LockStatus { let inner = self.inner_mut(); - let _span = inner.traces.child("cache_lock"); + let mut span = inner.traces.child("cache_lock"); let lock = inner.lock.take(); // remove the lock from self if let Some(Locked::Read(r)) = lock { let now = Instant::now(); @@ -1059,7 +1060,10 @@ impl HttpCache { .lock_duration .map_or(lock_duration, |d| d + lock_duration), ); - r.lock_status() // TODO: tag the span with lock status + let status = r.lock_status(); + let tag_value: &'static str = status.into(); + span.set_tag(|| Tag::new("status", tag_value)); + status } else { // should always call is_cache_locked() before this function panic!("cache_lock_wait on wrong type of lock") diff --git a/pingora-cache/src/lock.rs b/pingora-cache/src/lock.rs index 7f50691b..8853cb17 100644 --- a/pingora-cache/src/lock.rs +++ b/pingora-cache/src/lock.rs @@ -100,12 +100,14 @@ impl CacheLock { } } +use log::warn; use std::sync::atomic::{AtomicU8, Ordering}; use std::time::{Duration, Instant}; +use strum::IntoStaticStr; use tokio::sync::Semaphore; /// Status which the read locks could possibly see. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, IntoStaticStr)] pub enum LockStatus { /// Waiting for the writer to populate the asset Waiting, @@ -180,7 +182,7 @@ impl LockCore { } fn lock_status(&self) -> LockStatus { - self.lock_status.load(Ordering::Relaxed).into() + self.lock_status.load(Ordering::SeqCst).into() } } @@ -197,11 +199,22 @@ impl ReadLock { return; } - // TODO: should subtract now - start so that the lock don't wait beyond start + timeout - // Also need to be careful not to wake everyone up at the same time + // TODO: need to be careful not to wake everyone up at the same time // (maybe not an issue because regular cache lock release behaves that way) - let _ = timeout(self.0.timeout, self.0.lock.acquire()).await; - // permit is returned to Semaphore right away + if let Some(duration) = self.0.timeout.checked_sub(self.0.lock_start.elapsed()) { + match timeout(duration, self.0.lock.acquire()).await { + Ok(Ok(_)) => { // permit is returned to Semaphore right away + } + Ok(Err(e)) => { + warn!("error acquiring semaphore {e:?}") + } + Err(_) => { + self.0 + .lock_status + .store(LockStatus::Timeout.into(), Ordering::SeqCst); + } + } + } } /// Test if it is still locked @@ -211,7 +224,7 @@ impl ReadLock { /// Whether the lock is expired, e.g., the writer has been holding the lock for too long pub fn expired(&self) -> bool { - // NOTE: this whether the lock is currently expired + // NOTE: this is whether the lock is currently expired // not whether it was timed out during wait() self.0.lock_start.elapsed() >= self.0.timeout }