From 2853309dcdbf7772457cde2d61fa2286e363ae0e Mon Sep 17 00:00:00 2001 From: Yuchen Wu Date: Fri, 30 Aug 2024 09:41:09 -0700 Subject: [PATCH 1/4] Windows support 2/n: Support FD types on different platforms --- .bleep | 2 +- pingora-core/src/connectors/http/v2.rs | 10 +++--- pingora-core/src/protocols/http/v1/client.rs | 4 +-- pingora-core/src/protocols/http/v2/client.rs | 8 ++--- pingora-core/src/protocols/l4/stream.rs | 4 +-- pingora-core/src/protocols/mod.rs | 38 +++++++++++++++++--- pingora-core/src/protocols/tls/mod.rs | 4 +-- 7 files changed, 49 insertions(+), 21 deletions(-) diff --git a/.bleep b/.bleep index 5081a5be..f71cb64a 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -2351cdf592f9986201d754e6ee1f37f493f69abb \ No newline at end of file +bea67a70dff1b8a8a04d46b0c322e8fac1120d0b \ No newline at end of file diff --git a/pingora-core/src/connectors/http/v2.rs b/pingora-core/src/connectors/http/v2.rs index 433bc4bb..60e26fb6 100644 --- a/pingora-core/src/connectors/http/v2.rs +++ b/pingora-core/src/connectors/http/v2.rs @@ -16,7 +16,7 @@ use super::HttpSession; use crate::connectors::{ConnectorOptions, TransportConnector}; use crate::protocols::http::v1::client::HttpSession as Http1Session; use crate::protocols::http::v2::client::{drive_connection, Http2Session}; -use crate::protocols::{Digest, Stream}; +use crate::protocols::{Digest, Stream, UniqueIDType}; use crate::upstreams::peer::{Peer, ALPN}; use bytes::Bytes; @@ -47,7 +47,7 @@ pub(crate) struct ConnectionRefInner { connection_stub: Stub, closed: watch::Receiver, ping_timeout_occurred: Arc, - id: i32, + id: UniqueIDType, // max concurrent streams this connection is allowed to create max_streams: usize, // how many concurrent streams already active @@ -69,7 +69,7 @@ impl ConnectionRef { send_req: SendRequest, closed: watch::Receiver, ping_timeout_occurred: Arc, - id: i32, + id: UniqueIDType, max_streams: usize, digest: Digest, ) -> Self { @@ -98,7 +98,7 @@ impl ConnectionRef { self.0.current_streams.fetch_sub(1, Ordering::SeqCst); } - pub fn id(&self) -> i32 { + pub fn id(&self) -> UniqueIDType { self.0.id } @@ -196,7 +196,7 @@ impl InUsePool { // release a h2_stream, this functional will cause an ConnectionRef to be returned (if exist) // the caller should update the ref and then decide where to put it (in use pool or idle) - fn release(&self, reuse_hash: u64, id: i32) -> Option { + fn release(&self, reuse_hash: u64, id: UniqueIDType) -> Option { let pools = self.pools.read(); if let Some(pool) = pools.get(&reuse_hash) { pool.remove(id) diff --git a/pingora-core/src/protocols/http/v1/client.rs b/pingora-core/src/protocols/http/v1/client.rs index 8c2ab14e..2b2640bc 100644 --- a/pingora-core/src/protocols/http/v1/client.rs +++ b/pingora-core/src/protocols/http/v1/client.rs @@ -28,7 +28,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use super::body::{BodyReader, BodyWriter}; use super::common::*; use crate::protocols::http::HttpTask; -use crate::protocols::{Digest, SocketAddr, Stream, UniqueID}; +use crate::protocols::{Digest, SocketAddr, Stream, UniqueID, UniqueIDType}; use crate::utils::{BufRef, KVRef}; /// The HTTP 1.x client session @@ -717,7 +717,7 @@ pub(crate) fn http_req_header_to_wire(req: &RequestHeader) -> Option { } impl UniqueID for HttpSession { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { self.underlying_stream.id() } } diff --git a/pingora-core/src/protocols/http/v2/client.rs b/pingora-core/src/protocols/http/v2/client.rs index 9bdbff46..1d89004a 100644 --- a/pingora-core/src/protocols/http/v2/client.rs +++ b/pingora-core/src/protocols/http/v2/client.rs @@ -30,7 +30,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::watch; use crate::connectors::http::v2::ConnectionRef; -use crate::protocols::{Digest, SocketAddr}; +use crate::protocols::{Digest, SocketAddr, UniqueIDType}; pub const PING_TIMEDOUT: ErrorType = ErrorType::new("PingTimedout"); @@ -336,7 +336,7 @@ impl Http2Session { } /// the FD of the underlying connection - pub fn fd(&self) -> i32 { + pub fn fd(&self) -> UniqueIDType { self.conn.id() } @@ -427,7 +427,7 @@ use tokio::sync::oneshot; pub async fn drive_connection( mut c: client::Connection, - id: i32, + id: UniqueIDType, closed: watch::Sender, ping_interval: Option, ping_timeout_occurred: Arc, @@ -481,7 +481,7 @@ async fn do_ping_pong( interval: Duration, tx: oneshot::Sender<()>, dropped: Arc, - id: i32, + id: UniqueIDType, ) { // delay before sending the first ping, no need to race with the first request tokio::time::sleep(interval).await; diff --git a/pingora-core/src/protocols/l4/stream.rs b/pingora-core/src/protocols/l4/stream.rs index ee91a8aa..edcb188e 100644 --- a/pingora-core/src/protocols/l4/stream.rs +++ b/pingora-core/src/protocols/l4/stream.rs @@ -30,7 +30,7 @@ use crate::protocols::l4::ext::{set_tcp_keepalive, TcpKeepalive}; use crate::protocols::raw_connect::ProxyDigest; use crate::protocols::{ GetProxyDigest, GetSocketDigest, GetTimingDigest, Shutdown, SocketDigest, Ssl, TimingDigest, - UniqueID, + UniqueID, UniqueIDType, }; use crate::upstreams::peer::Tracer; @@ -202,7 +202,7 @@ impl AsRawFd for Stream { } impl UniqueID for Stream { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { self.as_raw_fd() } } diff --git a/pingora-core/src/protocols/mod.rs b/pingora-core/src/protocols/mod.rs index fb30992a..4c1aa88c 100644 --- a/pingora-core/src/protocols/mod.rs +++ b/pingora-core/src/protocols/mod.rs @@ -32,6 +32,11 @@ use std::fmt::Debug; use std::net::{IpAddr, Ipv4Addr}; use std::sync::Arc; +#[cfg(unix)] +pub type UniqueIDType = i32; +#[cfg(windows)] +pub type UniqueIDType = usize; + /// Define how a protocol should shutdown its connection. #[async_trait] pub trait Shutdown { @@ -42,7 +47,7 @@ pub trait Shutdown { pub trait UniqueID { /// The ID returned should be unique among all existing connections of the same type. /// But ID can be recycled after a connection is shutdown. - fn id(&self) -> i32; + fn id(&self) -> UniqueIDType; } /// Interface to get TLS info @@ -126,7 +131,7 @@ mod ext_io_impl { async fn shutdown(&mut self) -> () {} } impl UniqueID for Mock { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { 0 } } @@ -154,7 +159,7 @@ mod ext_io_impl { async fn shutdown(&mut self) -> () {} } impl UniqueID for Cursor { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { 0 } } @@ -182,7 +187,7 @@ mod ext_io_impl { async fn shutdown(&mut self) -> () {} } impl UniqueID for DuplexStream { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { 0 } } @@ -204,15 +209,27 @@ mod ext_io_impl { } } +#[cfg(unix)] pub(crate) trait ConnFdReusable { fn check_fd_match(&self, fd: V) -> bool; } +#[cfg(windows)] +pub(crate) trait ConnSockReusable { + fn check_sock_match(&self, sock: V) -> bool; +} + use l4::socket::SocketAddr; use log::{debug, error}; +#[cfg(unix)] use nix::sys::socket::{getpeername, SockaddrStorage, UnixAddr}; -use std::{net::SocketAddr as InetSocketAddr, os::unix::prelude::AsRawFd, path::Path}; +#[cfg(unix)] +use std::os::unix::prelude::AsRawFd; +#[cfg(windows)] +use std::os::windows::io::AsRawSocket; +use std::{net::SocketAddr as InetSocketAddr, path::Path}; +#[cfg(unix)] impl ConnFdReusable for SocketAddr { fn check_fd_match(&self, fd: V) -> bool { match self { @@ -225,6 +242,16 @@ impl ConnFdReusable for SocketAddr { } } +#[cfg(windows)] +impl ConnSockReusable for SocketAddr { + fn check_sock_match(&self, sock: V) -> bool { + match self { + SocketAddr::Inet(addr) => addr.check_sock_match(sock), + } + } +} + +#[cfg(unix)] impl ConnFdReusable for Path { fn check_fd_match(&self, fd: V) -> bool { let fd = fd.as_raw_fd(); @@ -252,6 +279,7 @@ impl ConnFdReusable for Path { } } +#[cfg(unix)] impl ConnFdReusable for InetSocketAddr { fn check_fd_match(&self, fd: V) -> bool { let fd = fd.as_raw_fd(); diff --git a/pingora-core/src/protocols/tls/mod.rs b/pingora-core/src/protocols/tls/mod.rs index ca353c5b..89fe0d38 100644 --- a/pingora-core/src/protocols/tls/mod.rs +++ b/pingora-core/src/protocols/tls/mod.rs @@ -26,7 +26,7 @@ pub use boringssl_openssl::*; pub mod dummy_tls; use crate::protocols::digest::TimingDigest; -use crate::protocols::{Ssl, UniqueID}; +use crate::protocols::{Ssl, UniqueID, UniqueIDType}; use crate::tls::{self, ssl, tokio_ssl::SslStream as InnerSsl}; use log::warn; use pingora_error::{ErrorType::*, OrErr, Result}; @@ -169,7 +169,7 @@ impl UniqueID for SslStream where T: UniqueID, { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { self.ssl.get_ref().id() } } From 22361b7a28f55d4b8d2f9a6001ea1cd16ea479cc Mon Sep 17 00:00:00 2001 From: Edward Wang Date: Fri, 30 Aug 2024 20:08:59 -0700 Subject: [PATCH 2/4] Allow miss handler to lookup storage using streaming write tag When supporting streaming partial writes, the cache miss request may not always find the corresponding body reader for the storage asset that its upstream write is trying to fill. This change adds a lookup_streaming_write storage API and allows the miss handler to provide a streaming write tag for use with the API, so that it may find the matching hit handler to serve downstream. --- .bleep | 2 +- pingora-cache/src/lib.rs | 10 +- pingora-cache/src/max_file_size.rs | 4 + pingora-cache/src/memory.rs | 96 +++++++++++++--- pingora-cache/src/storage.rs | 103 +++++++++++++++++- pingora-proxy/tests/test_upstream.rs | 37 +++++++ .../tests/utils/conf/origin/conf/nginx.conf | 3 +- pingora-proxy/tests/utils/server_utils.rs | 13 +++ 8 files changed, 246 insertions(+), 22 deletions(-) diff --git a/.bleep b/.bleep index f71cb64a..95ee21df 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -bea67a70dff1b8a8a04d46b0c322e8fac1120d0b \ No newline at end of file +b5675546711d1fc8bc1a0aa28e4586e46d560024 \ No newline at end of file diff --git a/pingora-cache/src/lib.rs b/pingora-cache/src/lib.rs index f1f8bd21..8f88299a 100644 --- a/pingora-cache/src/lib.rs +++ b/pingora-cache/src/lib.rs @@ -612,7 +612,15 @@ impl HttpCache { // Downstream read and upstream write can be decoupled let body_reader = inner .storage - .lookup(key, &inner.traces.get_miss_span()) + .lookup_streaming_write( + key, + inner + .miss_handler + .as_ref() + .expect("miss handler already set") + .streaming_write_tag(), + &inner.traces.get_miss_span(), + ) .await?; if let Some((_meta, body_reader)) = body_reader { diff --git a/pingora-cache/src/max_file_size.rs b/pingora-cache/src/max_file_size.rs index 7d812f23..72caefa4 100644 --- a/pingora-cache/src/max_file_size.rs +++ b/pingora-cache/src/max_file_size.rs @@ -72,4 +72,8 @@ impl HandleMiss for MaxFileSizeMissHandler { async fn finish(self: Box) -> pingora_error::Result { self.inner.finish().await } + + fn streaming_write_tag(&self) -> Option<&[u8]> { + self.inner.streaming_write_tag() + } } diff --git a/pingora-cache/src/memory.rs b/pingora-cache/src/memory.rs index dec8f1ad..92831107 100644 --- a/pingora-cache/src/memory.rs +++ b/pingora-cache/src/memory.rs @@ -20,7 +20,7 @@ use super::*; use crate::key::CompactCacheKey; -use crate::storage::{HandleHit, HandleMiss}; +use crate::storage::{streaming_write::U64WriteId, HandleHit, HandleMiss}; use crate::trace::SpanHandle; use async_trait::async_trait; @@ -29,6 +29,7 @@ use parking_lot::RwLock; use pingora_error::*; use std::any::Any; use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tokio::sync::watch; @@ -68,7 +69,8 @@ impl TempObject { /// For testing only, not for production use. pub struct MemCache { pub(crate) cached: Arc>>, - pub(crate) temp: Arc>>, + pub(crate) temp: Arc>>>, + pub(crate) last_temp_id: AtomicU64, } impl MemCache { @@ -77,6 +79,7 @@ impl MemCache { MemCache { cached: Arc::new(RwLock::new(HashMap::new())), temp: Arc::new(RwLock::new(HashMap::new())), + last_temp_id: AtomicU64::new(0), } } } @@ -213,8 +216,11 @@ pub struct MemMissHandler { bytes_written: Arc>, // these are used only in finish() to data from temp to cache key: String, + temp_id: U64WriteId, + // key -> cache object cache: Arc>>, - temp: Arc>>, + // key -> (temp writer id -> temp object) to support concurrent writers + temp: Arc>>>, } #[async_trait] @@ -237,20 +243,48 @@ impl HandleMiss for MemMissHandler { async fn finish(self: Box) -> Result { // safe, the temp object is inserted when the miss handler is created - let cache_object = self.temp.read().get(&self.key).unwrap().make_cache_object(); + let cache_object = self + .temp + .read() + .get(&self.key) + .unwrap() + .get(&self.temp_id.into()) + .unwrap() + .make_cache_object(); let size = cache_object.body.len(); // FIXME: this just body size, also track meta size self.cache.write().insert(self.key.clone(), cache_object); - self.temp.write().remove(&self.key); + self.temp + .write() + .get_mut(&self.key) + .and_then(|map| map.remove(&self.temp_id.into())); Ok(size) } + + fn streaming_write_tag(&self) -> Option<&[u8]> { + Some(self.temp_id.as_bytes()) + } } impl Drop for MemMissHandler { fn drop(&mut self) { - self.temp.write().remove(&self.key); + self.temp + .write() + .get_mut(&self.key) + .and_then(|map| map.remove(&self.temp_id.into())); } } +fn hit_from_temp_obj(temp_obj: &TempObject) -> Result> { + let meta = CacheMeta::deserialize(&temp_obj.meta.0, &temp_obj.meta.1)?; + let partial = PartialHit { + body: temp_obj.body.clone(), + bytes_written: temp_obj.bytes_written.subscribe(), + bytes_read: 0, + }; + let hit_handler = MemHitHandler::Partial(partial); + Ok(Some((meta, Box::new(hit_handler)))) +} + #[async_trait] impl Storage for MemCache { async fn lookup( @@ -261,15 +295,14 @@ impl Storage for MemCache { let hash = key.combined(); // always prefer partial read otherwise fresh asset will not be visible on expired asset // until it is fully updated - if let Some(temp_obj) = self.temp.read().get(&hash) { - let meta = CacheMeta::deserialize(&temp_obj.meta.0, &temp_obj.meta.1)?; - let partial = PartialHit { - body: temp_obj.body.clone(), - bytes_written: temp_obj.bytes_written.subscribe(), - bytes_read: 0, - }; - let hit_handler = MemHitHandler::Partial(partial); - Ok(Some((meta, Box::new(hit_handler)))) + // no preference on which partial read we get (if there are multiple writers) + if let Some((_, temp_obj)) = self + .temp + .read() + .get(&hash) + .and_then(|map| map.iter().next()) + { + hit_from_temp_obj(temp_obj) } else if let Some(obj) = self.cached.read().get(&hash) { let meta = CacheMeta::deserialize(&obj.meta.0, &obj.meta.1)?; let hit_handler = CompleteHit { @@ -285,24 +318,49 @@ impl Storage for MemCache { } } + async fn lookup_streaming_write( + &'static self, + key: &CacheKey, + streaming_write_tag: Option<&[u8]>, + _trace: &SpanHandle, + ) -> Result> { + let hash = key.combined(); + let write_tag: U64WriteId = streaming_write_tag + .expect("tag must be set during streaming write") + .try_into() + .expect("tag must be correct length"); + hit_from_temp_obj( + self.temp + .read() + .get(&hash) + .and_then(|map| map.get(&write_tag.into())) + .expect("must have partial write in progress"), + ) + } + async fn get_miss_handler( &'static self, key: &CacheKey, meta: &CacheMeta, _trace: &SpanHandle, ) -> Result { - // TODO: support multiple concurrent writes or panic if the is already a writer let hash = key.combined(); let meta = meta.serialize()?; let temp_obj = TempObject::new(meta); + let temp_id = self.last_temp_id.fetch_add(1, Ordering::Relaxed); let miss_handler = MemMissHandler { body: temp_obj.body.clone(), bytes_written: temp_obj.bytes_written.clone(), key: hash.clone(), cache: self.cached.clone(), temp: self.temp.clone(), + temp_id: temp_id.into(), }; - self.temp.write().insert(hash, temp_obj); + self.temp + .write() + .entry(hash) + .or_default() + .insert(miss_handler.temp_id.into(), temp_obj); Ok(Box::new(miss_handler)) } @@ -526,7 +584,9 @@ mod test { ); let temp_obj = TempObject::new(meta); - cache.temp.write().insert(hash.clone(), temp_obj); + let mut map = HashMap::new(); + map.insert(0, temp_obj); + cache.temp.write().insert(hash.clone(), map); assert!(cache.temp.read().contains_key(&hash)); diff --git a/pingora-cache/src/storage.rs b/pingora-cache/src/storage.rs index 0d4e1041..7cf1e7b3 100644 --- a/pingora-cache/src/storage.rs +++ b/pingora-cache/src/storage.rs @@ -36,13 +36,34 @@ pub enum PurgeType { pub trait Storage { // TODO: shouldn't have to be static - /// Lookup the storage for the given [CacheKey] + /// Lookup the storage for the given [CacheKey]. async fn lookup( &'static self, key: &CacheKey, trace: &SpanHandle, ) -> Result>; + /// Lookup the storage for the given [CacheKey] using a streaming write tag. + /// + /// When streaming partial writes is supported, the request that initiates the write will also + /// pass an optional `streaming_write_tag` so that the storage may try to find the associated + /// [HitHandler], for the same ongoing write. + /// + /// Therefore, when the write tag is set, the storage implementation should either return a + /// [HitHandler] that can be matched to that tag, or none at all. Otherwise when the storage + /// supports concurrent streaming writes for the same key, the calling request may receive a + /// different body from the one it expected. + /// + /// By default this defers to the standard `Storage::lookup` implementation. + async fn lookup_streaming_write( + &'static self, + key: &CacheKey, + _streaming_write_tag: Option<&[u8]>, + trace: &SpanHandle, + ) -> Result> { + self.lookup(key, trace).await + } + /// Write the given [CacheMeta] to the storage. Return [MissHandler] to write the body later. async fn get_miss_handler( &'static self, @@ -130,7 +151,87 @@ pub trait HandleMiss { async fn finish( self: Box, // because self is always used as a trait object ) -> Result; + + /// Return a streaming write tag recognized by the underlying [`Storage`]. + /// + /// This is an arbitrary data identifier that is used to associate this miss handler's current + /// write with a hit handler for the same write. This identifier will be compared by the + /// storage during `lookup_streaming_write`. + // This write tag is essentially an borrowed data blob of bytes retrieved from the miss handler + // and passed to storage, which means it can support strings or small data types, e.g. bytes + // represented by a u64. + // The downside with the current API is that such a data blob must be owned by the miss handler + // and stored in a way that permits retrieval as a byte slice (not computed on the fly). + // But most use cases likely only require a simple integer and may not like the overhead of a + // Vec/String allocation or even a Cow, though such data types can also be used here. + fn streaming_write_tag(&self) -> Option<&[u8]> { + None + } } /// Miss Handler pub type MissHandler = Box<(dyn HandleMiss + Sync + Send)>; + +pub mod streaming_write { + /// Portable u64 (sized) write id convenience type for use with streaming writes. + /// + /// Often an integer value is sufficient for a streaming write tag. This convenience type enables + /// storing such a value and functions for consistent conversion between byte sequence data types. + #[derive(Debug, Clone, Copy)] + pub struct U64WriteId([u8; 8]); + + impl U64WriteId { + pub fn as_bytes(&self) -> &[u8] { + &self.0[..] + } + } + + impl From for U64WriteId { + fn from(value: u64) -> U64WriteId { + U64WriteId(value.to_be_bytes()) + } + } + impl From for u64 { + fn from(value: U64WriteId) -> u64 { + u64::from_be_bytes(value.0) + } + } + impl TryFrom<&[u8]> for U64WriteId { + type Error = std::array::TryFromSliceError; + + fn try_from(value: &[u8]) -> std::result::Result { + Ok(U64WriteId(value.try_into()?)) + } + } + + /// Portable u32 (sized) write id convenience type for use with streaming writes. + /// + /// Often an integer value is sufficient for a streaming write tag. This convenience type enables + /// storing such a value and functions for consistent conversion between byte sequence data types. + #[derive(Debug, Clone, Copy)] + pub struct U32WriteId([u8; 4]); + + impl U32WriteId { + pub fn as_bytes(&self) -> &[u8] { + &self.0[..] + } + } + + impl From for U32WriteId { + fn from(value: u32) -> U32WriteId { + U32WriteId(value.to_be_bytes()) + } + } + impl From for u32 { + fn from(value: U32WriteId) -> u32 { + u32::from_be_bytes(value.0) + } + } + impl TryFrom<&[u8]> for U32WriteId { + type Error = std::array::TryFromSliceError; + + fn try_from(value: &[u8]) -> std::result::Result { + Ok(U32WriteId(value.try_into()?)) + } + } +} diff --git a/pingora-proxy/tests/test_upstream.rs b/pingora-proxy/tests/test_upstream.rs index 02174c10..42319a39 100644 --- a/pingora-proxy/tests/test_upstream.rs +++ b/pingora-proxy/tests/test_upstream.rs @@ -1516,6 +1516,43 @@ mod test_cache { task3.await.unwrap(); } + #[tokio::test] + async fn test_cache_streaming_multiple_writers() { + // multiple streaming writers don't conflict + init(); + let url = "http://127.0.0.1:6148/slow_body/test_cache_streaming_multiple_writers.txt"; + let task1 = tokio::spawn(async move { + let res = reqwest::Client::new() + .get(url) + .header("x-set-hello", "everyone") + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + let headers = res.headers(); + assert_eq!(headers["x-cache-status"], "miss"); + assert_eq!(res.text().await.unwrap(), "hello everyone!"); + }); + + let task2 = tokio::spawn(async move { + let res = reqwest::Client::new() + .get(url) + // don't allow using the other streaming write's result + .header("x-force-expire", "1") + .header("x-set-hello", "todo el mundo") + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + let headers = res.headers(); + assert_eq!(headers["x-cache-status"], "miss"); + assert_eq!(res.text().await.unwrap(), "hello todo el mundo!"); + }); + + task1.await.unwrap(); + task2.await.unwrap(); + } + #[tokio::test] async fn test_range_request() { init(); diff --git a/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf b/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf index a41a743d..6d5abd73 100644 --- a/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf +++ b/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf @@ -408,12 +408,13 @@ http { location /slow_body { content_by_lua_block { local sleep_sec = tonumber(ngx.var.http_x_set_sleep) or 1 + local hello_to = ngx.var.http_x_set_hello or "world" ngx.flush() ngx.sleep(sleep_sec) ngx.print("hello ") ngx.flush() ngx.sleep(sleep_sec) - ngx.print("world") + ngx.print(hello_to) ngx.sleep(sleep_sec) ngx.print("!") } diff --git a/pingora-proxy/tests/utils/server_utils.rs b/pingora-proxy/tests/utils/server_utils.rs index 7db26229..f90a27e9 100644 --- a/pingora-proxy/tests/utils/server_utils.rs +++ b/pingora-proxy/tests/utils/server_utils.rs @@ -399,6 +399,19 @@ impl ProxyHttp for ExampleProxyCache { Ok(()) } + async fn cache_hit_filter( + &self, + session: &Session, + _meta: &CacheMeta, + _ctx: &mut Self::CTX, + ) -> Result { + // allow test header to control force expiry + if session.get_header_bytes("x-force-expire") != b"" { + return Ok(true); + } + Ok(false) + } + fn cache_vary_filter( &self, meta: &CacheMeta, From ac88ddd9541b5f6ac6832b666a420dee8100f89e Mon Sep 17 00:00:00 2001 From: James Yang <26634873@qq.com> Date: Mon, 2 Sep 2024 09:11:22 +0000 Subject: [PATCH 3/4] fix(proxy): typo in example Includes-commit: 9b1d7c544d45f88db55cc9ac118a8f60e8ef84b9 Replicated-from: https://github.com/cloudflare/pingora/pull/371 --- .bleep | 2 +- pingora-proxy/examples/gateway.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.bleep b/.bleep index 95ee21df..ae7e1e2c 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -b5675546711d1fc8bc1a0aa28e4586e46d560024 \ No newline at end of file +265d5245705c6f762c7dd37536f9d1c61d2d8af9 \ No newline at end of file diff --git a/pingora-proxy/examples/gateway.rs b/pingora-proxy/examples/gateway.rs index 0bd53306..78f4aae9 100644 --- a/pingora-proxy/examples/gateway.rs +++ b/pingora-proxy/examples/gateway.rs @@ -121,7 +121,7 @@ fn main() { let mut my_proxy = pingora_proxy::http_proxy_service( &my_server.configuration, MyGateway { - req_metric: register_int_counter!("reg_counter", "Number of requests").unwrap(), + req_metric: register_int_counter!("req_counter", "Number of requests").unwrap(), }, ); my_proxy.add_tcp("0.0.0.0:6191"); From 88bd91679993f18038bd1dbc974822643324ee56 Mon Sep 17 00:00:00 2001 From: spacewander Date: Mon, 2 Sep 2024 13:57:21 +0000 Subject: [PATCH 4/4] document early_request_filter Fix #363 --- updated Includes-commit: 204ddb02743445e6204858b4df3ea3f28fa1911c Includes-commit: 34c67f0acae52ce94a3a6d5696e0847e63b42be7 Replicated-from: https://github.com/cloudflare/pingora/pull/368 Signed-off-by: spacewander --- .bleep | 2 +- docs/user_guide/phase.md | 8 ++++++-- docs/user_guide/phase_chart.md | 3 ++- pingora-proxy/src/proxy_trait.rs | 6 +++--- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/.bleep b/.bleep index ae7e1e2c..2835ccc3 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -265d5245705c6f762c7dd37536f9d1c61d2d8af9 \ No newline at end of file +6923a7f31ed6fff1ddaf00ffb8dd56311042c395 \ No newline at end of file diff --git a/docs/user_guide/phase.md b/docs/user_guide/phase.md index a3264968..46e9b5db 100644 --- a/docs/user_guide/phase.md +++ b/docs/user_guide/phase.md @@ -16,7 +16,8 @@ The pingora-proxy HTTP proxy framework supports highly programmable proxy behavi Pingora-proxy allows users to insert arbitrary logic into the life of a request. ```mermaid graph TD; - start("new request")-->request_filter; + start("new request")-->early_request_filter; + early_request_filter-->request_filter; request_filter-->upstream_peer; upstream_peer-->Connect{{IO: connect to upstream}}; @@ -52,9 +53,12 @@ Pingora-proxy allows users to insert arbitrary logic into the life of a request. * The reason both `upstream_response_*_filter()` and `response_*_filter()` exist is for HTTP caching integration reasons (still WIP). -### `request_filter()` +### `early_request_filter()` This is the first phase of every request. +This function is similar to `request_filter()` but executes before any other logic, including downstream module logic. The main purpose of this function is to provide finer-grained control of the behavior of the modules. + +### `request_filter()` This phase is usually for validating request inputs, rate limiting, and initializing context. ### `proxy_upstream_filter()` diff --git a/docs/user_guide/phase_chart.md b/docs/user_guide/phase_chart.md index a7d01d43..a6b6a4e1 100644 --- a/docs/user_guide/phase_chart.md +++ b/docs/user_guide/phase_chart.md @@ -1,7 +1,8 @@ Pingora proxy phases without caching ```mermaid graph TD; - start("new request")-->request_filter; + start("new request")-->early_request_filter; + early_request_filter-->request_filter; request_filter-->upstream_peer; upstream_peer-->Connect{{IO: connect to upstream}}; diff --git a/pingora-proxy/src/proxy_trait.rs b/pingora-proxy/src/proxy_trait.rs index 2d99567e..029ea58f 100644 --- a/pingora-proxy/src/proxy_trait.rs +++ b/pingora-proxy/src/proxy_trait.rs @@ -69,9 +69,9 @@ pub trait ProxyHttp { /// Handle the incoming request before any downstream module is executed. /// - /// This function is similar to [Self::request_filter()] but execute before any other logic - /// especially the downstream modules. The main purpose of this function is to provide finer - /// grained control of behavior of the modules. + /// This function is similar to [Self::request_filter()] but executes before any other logic, + /// including downstream module logic. The main purpose of this function is to provide finer + /// grained control of the behavior of the modules. /// /// Note that because this function is executed before any module that might provide access /// control or rate limiting, logic should stay in request_filter() if it can in order to be