diff --git a/.bleep b/.bleep index 5081a5be..2835ccc3 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -2351cdf592f9986201d754e6ee1f37f493f69abb \ No newline at end of file +6923a7f31ed6fff1ddaf00ffb8dd56311042c395 \ No newline at end of file diff --git a/docs/user_guide/phase.md b/docs/user_guide/phase.md index a3264968..46e9b5db 100644 --- a/docs/user_guide/phase.md +++ b/docs/user_guide/phase.md @@ -16,7 +16,8 @@ The pingora-proxy HTTP proxy framework supports highly programmable proxy behavi Pingora-proxy allows users to insert arbitrary logic into the life of a request. ```mermaid graph TD; - start("new request")-->request_filter; + start("new request")-->early_request_filter; + early_request_filter-->request_filter; request_filter-->upstream_peer; upstream_peer-->Connect{{IO: connect to upstream}}; @@ -52,9 +53,12 @@ Pingora-proxy allows users to insert arbitrary logic into the life of a request. * The reason both `upstream_response_*_filter()` and `response_*_filter()` exist is for HTTP caching integration reasons (still WIP). -### `request_filter()` +### `early_request_filter()` This is the first phase of every request. +This function is similar to `request_filter()` but executes before any other logic, including downstream module logic. The main purpose of this function is to provide finer-grained control of the behavior of the modules. + +### `request_filter()` This phase is usually for validating request inputs, rate limiting, and initializing context. ### `proxy_upstream_filter()` diff --git a/docs/user_guide/phase_chart.md b/docs/user_guide/phase_chart.md index a7d01d43..a6b6a4e1 100644 --- a/docs/user_guide/phase_chart.md +++ b/docs/user_guide/phase_chart.md @@ -1,7 +1,8 @@ Pingora proxy phases without caching ```mermaid graph TD; - start("new request")-->request_filter; + start("new request")-->early_request_filter; + early_request_filter-->request_filter; request_filter-->upstream_peer; upstream_peer-->Connect{{IO: connect to upstream}}; diff --git a/pingora-cache/src/lib.rs b/pingora-cache/src/lib.rs index f1f8bd21..8f88299a 100644 --- a/pingora-cache/src/lib.rs +++ b/pingora-cache/src/lib.rs @@ -612,7 +612,15 @@ impl HttpCache { // Downstream read and upstream write can be decoupled let body_reader = inner .storage - .lookup(key, &inner.traces.get_miss_span()) + .lookup_streaming_write( + key, + inner + .miss_handler + .as_ref() + .expect("miss handler already set") + .streaming_write_tag(), + &inner.traces.get_miss_span(), + ) .await?; if let Some((_meta, body_reader)) = body_reader { diff --git a/pingora-cache/src/max_file_size.rs b/pingora-cache/src/max_file_size.rs index 7d812f23..72caefa4 100644 --- a/pingora-cache/src/max_file_size.rs +++ b/pingora-cache/src/max_file_size.rs @@ -72,4 +72,8 @@ impl HandleMiss for MaxFileSizeMissHandler { async fn finish(self: Box) -> pingora_error::Result { self.inner.finish().await } + + fn streaming_write_tag(&self) -> Option<&[u8]> { + self.inner.streaming_write_tag() + } } diff --git a/pingora-cache/src/memory.rs b/pingora-cache/src/memory.rs index dec8f1ad..92831107 100644 --- a/pingora-cache/src/memory.rs +++ b/pingora-cache/src/memory.rs @@ -20,7 +20,7 @@ use super::*; use crate::key::CompactCacheKey; -use crate::storage::{HandleHit, HandleMiss}; +use crate::storage::{streaming_write::U64WriteId, HandleHit, HandleMiss}; use crate::trace::SpanHandle; use async_trait::async_trait; @@ -29,6 +29,7 @@ use parking_lot::RwLock; use pingora_error::*; use std::any::Any; use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tokio::sync::watch; @@ -68,7 +69,8 @@ impl TempObject { /// For testing only, not for production use. pub struct MemCache { pub(crate) cached: Arc>>, - pub(crate) temp: Arc>>, + pub(crate) temp: Arc>>>, + pub(crate) last_temp_id: AtomicU64, } impl MemCache { @@ -77,6 +79,7 @@ impl MemCache { MemCache { cached: Arc::new(RwLock::new(HashMap::new())), temp: Arc::new(RwLock::new(HashMap::new())), + last_temp_id: AtomicU64::new(0), } } } @@ -213,8 +216,11 @@ pub struct MemMissHandler { bytes_written: Arc>, // these are used only in finish() to data from temp to cache key: String, + temp_id: U64WriteId, + // key -> cache object cache: Arc>>, - temp: Arc>>, + // key -> (temp writer id -> temp object) to support concurrent writers + temp: Arc>>>, } #[async_trait] @@ -237,20 +243,48 @@ impl HandleMiss for MemMissHandler { async fn finish(self: Box) -> Result { // safe, the temp object is inserted when the miss handler is created - let cache_object = self.temp.read().get(&self.key).unwrap().make_cache_object(); + let cache_object = self + .temp + .read() + .get(&self.key) + .unwrap() + .get(&self.temp_id.into()) + .unwrap() + .make_cache_object(); let size = cache_object.body.len(); // FIXME: this just body size, also track meta size self.cache.write().insert(self.key.clone(), cache_object); - self.temp.write().remove(&self.key); + self.temp + .write() + .get_mut(&self.key) + .and_then(|map| map.remove(&self.temp_id.into())); Ok(size) } + + fn streaming_write_tag(&self) -> Option<&[u8]> { + Some(self.temp_id.as_bytes()) + } } impl Drop for MemMissHandler { fn drop(&mut self) { - self.temp.write().remove(&self.key); + self.temp + .write() + .get_mut(&self.key) + .and_then(|map| map.remove(&self.temp_id.into())); } } +fn hit_from_temp_obj(temp_obj: &TempObject) -> Result> { + let meta = CacheMeta::deserialize(&temp_obj.meta.0, &temp_obj.meta.1)?; + let partial = PartialHit { + body: temp_obj.body.clone(), + bytes_written: temp_obj.bytes_written.subscribe(), + bytes_read: 0, + }; + let hit_handler = MemHitHandler::Partial(partial); + Ok(Some((meta, Box::new(hit_handler)))) +} + #[async_trait] impl Storage for MemCache { async fn lookup( @@ -261,15 +295,14 @@ impl Storage for MemCache { let hash = key.combined(); // always prefer partial read otherwise fresh asset will not be visible on expired asset // until it is fully updated - if let Some(temp_obj) = self.temp.read().get(&hash) { - let meta = CacheMeta::deserialize(&temp_obj.meta.0, &temp_obj.meta.1)?; - let partial = PartialHit { - body: temp_obj.body.clone(), - bytes_written: temp_obj.bytes_written.subscribe(), - bytes_read: 0, - }; - let hit_handler = MemHitHandler::Partial(partial); - Ok(Some((meta, Box::new(hit_handler)))) + // no preference on which partial read we get (if there are multiple writers) + if let Some((_, temp_obj)) = self + .temp + .read() + .get(&hash) + .and_then(|map| map.iter().next()) + { + hit_from_temp_obj(temp_obj) } else if let Some(obj) = self.cached.read().get(&hash) { let meta = CacheMeta::deserialize(&obj.meta.0, &obj.meta.1)?; let hit_handler = CompleteHit { @@ -285,24 +318,49 @@ impl Storage for MemCache { } } + async fn lookup_streaming_write( + &'static self, + key: &CacheKey, + streaming_write_tag: Option<&[u8]>, + _trace: &SpanHandle, + ) -> Result> { + let hash = key.combined(); + let write_tag: U64WriteId = streaming_write_tag + .expect("tag must be set during streaming write") + .try_into() + .expect("tag must be correct length"); + hit_from_temp_obj( + self.temp + .read() + .get(&hash) + .and_then(|map| map.get(&write_tag.into())) + .expect("must have partial write in progress"), + ) + } + async fn get_miss_handler( &'static self, key: &CacheKey, meta: &CacheMeta, _trace: &SpanHandle, ) -> Result { - // TODO: support multiple concurrent writes or panic if the is already a writer let hash = key.combined(); let meta = meta.serialize()?; let temp_obj = TempObject::new(meta); + let temp_id = self.last_temp_id.fetch_add(1, Ordering::Relaxed); let miss_handler = MemMissHandler { body: temp_obj.body.clone(), bytes_written: temp_obj.bytes_written.clone(), key: hash.clone(), cache: self.cached.clone(), temp: self.temp.clone(), + temp_id: temp_id.into(), }; - self.temp.write().insert(hash, temp_obj); + self.temp + .write() + .entry(hash) + .or_default() + .insert(miss_handler.temp_id.into(), temp_obj); Ok(Box::new(miss_handler)) } @@ -526,7 +584,9 @@ mod test { ); let temp_obj = TempObject::new(meta); - cache.temp.write().insert(hash.clone(), temp_obj); + let mut map = HashMap::new(); + map.insert(0, temp_obj); + cache.temp.write().insert(hash.clone(), map); assert!(cache.temp.read().contains_key(&hash)); diff --git a/pingora-cache/src/storage.rs b/pingora-cache/src/storage.rs index 0d4e1041..7cf1e7b3 100644 --- a/pingora-cache/src/storage.rs +++ b/pingora-cache/src/storage.rs @@ -36,13 +36,34 @@ pub enum PurgeType { pub trait Storage { // TODO: shouldn't have to be static - /// Lookup the storage for the given [CacheKey] + /// Lookup the storage for the given [CacheKey]. async fn lookup( &'static self, key: &CacheKey, trace: &SpanHandle, ) -> Result>; + /// Lookup the storage for the given [CacheKey] using a streaming write tag. + /// + /// When streaming partial writes is supported, the request that initiates the write will also + /// pass an optional `streaming_write_tag` so that the storage may try to find the associated + /// [HitHandler], for the same ongoing write. + /// + /// Therefore, when the write tag is set, the storage implementation should either return a + /// [HitHandler] that can be matched to that tag, or none at all. Otherwise when the storage + /// supports concurrent streaming writes for the same key, the calling request may receive a + /// different body from the one it expected. + /// + /// By default this defers to the standard `Storage::lookup` implementation. + async fn lookup_streaming_write( + &'static self, + key: &CacheKey, + _streaming_write_tag: Option<&[u8]>, + trace: &SpanHandle, + ) -> Result> { + self.lookup(key, trace).await + } + /// Write the given [CacheMeta] to the storage. Return [MissHandler] to write the body later. async fn get_miss_handler( &'static self, @@ -130,7 +151,87 @@ pub trait HandleMiss { async fn finish( self: Box, // because self is always used as a trait object ) -> Result; + + /// Return a streaming write tag recognized by the underlying [`Storage`]. + /// + /// This is an arbitrary data identifier that is used to associate this miss handler's current + /// write with a hit handler for the same write. This identifier will be compared by the + /// storage during `lookup_streaming_write`. + // This write tag is essentially an borrowed data blob of bytes retrieved from the miss handler + // and passed to storage, which means it can support strings or small data types, e.g. bytes + // represented by a u64. + // The downside with the current API is that such a data blob must be owned by the miss handler + // and stored in a way that permits retrieval as a byte slice (not computed on the fly). + // But most use cases likely only require a simple integer and may not like the overhead of a + // Vec/String allocation or even a Cow, though such data types can also be used here. + fn streaming_write_tag(&self) -> Option<&[u8]> { + None + } } /// Miss Handler pub type MissHandler = Box<(dyn HandleMiss + Sync + Send)>; + +pub mod streaming_write { + /// Portable u64 (sized) write id convenience type for use with streaming writes. + /// + /// Often an integer value is sufficient for a streaming write tag. This convenience type enables + /// storing such a value and functions for consistent conversion between byte sequence data types. + #[derive(Debug, Clone, Copy)] + pub struct U64WriteId([u8; 8]); + + impl U64WriteId { + pub fn as_bytes(&self) -> &[u8] { + &self.0[..] + } + } + + impl From for U64WriteId { + fn from(value: u64) -> U64WriteId { + U64WriteId(value.to_be_bytes()) + } + } + impl From for u64 { + fn from(value: U64WriteId) -> u64 { + u64::from_be_bytes(value.0) + } + } + impl TryFrom<&[u8]> for U64WriteId { + type Error = std::array::TryFromSliceError; + + fn try_from(value: &[u8]) -> std::result::Result { + Ok(U64WriteId(value.try_into()?)) + } + } + + /// Portable u32 (sized) write id convenience type for use with streaming writes. + /// + /// Often an integer value is sufficient for a streaming write tag. This convenience type enables + /// storing such a value and functions for consistent conversion between byte sequence data types. + #[derive(Debug, Clone, Copy)] + pub struct U32WriteId([u8; 4]); + + impl U32WriteId { + pub fn as_bytes(&self) -> &[u8] { + &self.0[..] + } + } + + impl From for U32WriteId { + fn from(value: u32) -> U32WriteId { + U32WriteId(value.to_be_bytes()) + } + } + impl From for u32 { + fn from(value: U32WriteId) -> u32 { + u32::from_be_bytes(value.0) + } + } + impl TryFrom<&[u8]> for U32WriteId { + type Error = std::array::TryFromSliceError; + + fn try_from(value: &[u8]) -> std::result::Result { + Ok(U32WriteId(value.try_into()?)) + } + } +} diff --git a/pingora-core/src/connectors/http/v2.rs b/pingora-core/src/connectors/http/v2.rs index 433bc4bb..60e26fb6 100644 --- a/pingora-core/src/connectors/http/v2.rs +++ b/pingora-core/src/connectors/http/v2.rs @@ -16,7 +16,7 @@ use super::HttpSession; use crate::connectors::{ConnectorOptions, TransportConnector}; use crate::protocols::http::v1::client::HttpSession as Http1Session; use crate::protocols::http::v2::client::{drive_connection, Http2Session}; -use crate::protocols::{Digest, Stream}; +use crate::protocols::{Digest, Stream, UniqueIDType}; use crate::upstreams::peer::{Peer, ALPN}; use bytes::Bytes; @@ -47,7 +47,7 @@ pub(crate) struct ConnectionRefInner { connection_stub: Stub, closed: watch::Receiver, ping_timeout_occurred: Arc, - id: i32, + id: UniqueIDType, // max concurrent streams this connection is allowed to create max_streams: usize, // how many concurrent streams already active @@ -69,7 +69,7 @@ impl ConnectionRef { send_req: SendRequest, closed: watch::Receiver, ping_timeout_occurred: Arc, - id: i32, + id: UniqueIDType, max_streams: usize, digest: Digest, ) -> Self { @@ -98,7 +98,7 @@ impl ConnectionRef { self.0.current_streams.fetch_sub(1, Ordering::SeqCst); } - pub fn id(&self) -> i32 { + pub fn id(&self) -> UniqueIDType { self.0.id } @@ -196,7 +196,7 @@ impl InUsePool { // release a h2_stream, this functional will cause an ConnectionRef to be returned (if exist) // the caller should update the ref and then decide where to put it (in use pool or idle) - fn release(&self, reuse_hash: u64, id: i32) -> Option { + fn release(&self, reuse_hash: u64, id: UniqueIDType) -> Option { let pools = self.pools.read(); if let Some(pool) = pools.get(&reuse_hash) { pool.remove(id) diff --git a/pingora-core/src/protocols/http/v1/client.rs b/pingora-core/src/protocols/http/v1/client.rs index 8c2ab14e..2b2640bc 100644 --- a/pingora-core/src/protocols/http/v1/client.rs +++ b/pingora-core/src/protocols/http/v1/client.rs @@ -28,7 +28,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use super::body::{BodyReader, BodyWriter}; use super::common::*; use crate::protocols::http::HttpTask; -use crate::protocols::{Digest, SocketAddr, Stream, UniqueID}; +use crate::protocols::{Digest, SocketAddr, Stream, UniqueID, UniqueIDType}; use crate::utils::{BufRef, KVRef}; /// The HTTP 1.x client session @@ -717,7 +717,7 @@ pub(crate) fn http_req_header_to_wire(req: &RequestHeader) -> Option { } impl UniqueID for HttpSession { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { self.underlying_stream.id() } } diff --git a/pingora-core/src/protocols/http/v2/client.rs b/pingora-core/src/protocols/http/v2/client.rs index 9bdbff46..1d89004a 100644 --- a/pingora-core/src/protocols/http/v2/client.rs +++ b/pingora-core/src/protocols/http/v2/client.rs @@ -30,7 +30,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::watch; use crate::connectors::http::v2::ConnectionRef; -use crate::protocols::{Digest, SocketAddr}; +use crate::protocols::{Digest, SocketAddr, UniqueIDType}; pub const PING_TIMEDOUT: ErrorType = ErrorType::new("PingTimedout"); @@ -336,7 +336,7 @@ impl Http2Session { } /// the FD of the underlying connection - pub fn fd(&self) -> i32 { + pub fn fd(&self) -> UniqueIDType { self.conn.id() } @@ -427,7 +427,7 @@ use tokio::sync::oneshot; pub async fn drive_connection( mut c: client::Connection, - id: i32, + id: UniqueIDType, closed: watch::Sender, ping_interval: Option, ping_timeout_occurred: Arc, @@ -481,7 +481,7 @@ async fn do_ping_pong( interval: Duration, tx: oneshot::Sender<()>, dropped: Arc, - id: i32, + id: UniqueIDType, ) { // delay before sending the first ping, no need to race with the first request tokio::time::sleep(interval).await; diff --git a/pingora-core/src/protocols/l4/stream.rs b/pingora-core/src/protocols/l4/stream.rs index ee91a8aa..edcb188e 100644 --- a/pingora-core/src/protocols/l4/stream.rs +++ b/pingora-core/src/protocols/l4/stream.rs @@ -30,7 +30,7 @@ use crate::protocols::l4::ext::{set_tcp_keepalive, TcpKeepalive}; use crate::protocols::raw_connect::ProxyDigest; use crate::protocols::{ GetProxyDigest, GetSocketDigest, GetTimingDigest, Shutdown, SocketDigest, Ssl, TimingDigest, - UniqueID, + UniqueID, UniqueIDType, }; use crate::upstreams::peer::Tracer; @@ -202,7 +202,7 @@ impl AsRawFd for Stream { } impl UniqueID for Stream { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { self.as_raw_fd() } } diff --git a/pingora-core/src/protocols/mod.rs b/pingora-core/src/protocols/mod.rs index fb30992a..4c1aa88c 100644 --- a/pingora-core/src/protocols/mod.rs +++ b/pingora-core/src/protocols/mod.rs @@ -32,6 +32,11 @@ use std::fmt::Debug; use std::net::{IpAddr, Ipv4Addr}; use std::sync::Arc; +#[cfg(unix)] +pub type UniqueIDType = i32; +#[cfg(windows)] +pub type UniqueIDType = usize; + /// Define how a protocol should shutdown its connection. #[async_trait] pub trait Shutdown { @@ -42,7 +47,7 @@ pub trait Shutdown { pub trait UniqueID { /// The ID returned should be unique among all existing connections of the same type. /// But ID can be recycled after a connection is shutdown. - fn id(&self) -> i32; + fn id(&self) -> UniqueIDType; } /// Interface to get TLS info @@ -126,7 +131,7 @@ mod ext_io_impl { async fn shutdown(&mut self) -> () {} } impl UniqueID for Mock { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { 0 } } @@ -154,7 +159,7 @@ mod ext_io_impl { async fn shutdown(&mut self) -> () {} } impl UniqueID for Cursor { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { 0 } } @@ -182,7 +187,7 @@ mod ext_io_impl { async fn shutdown(&mut self) -> () {} } impl UniqueID for DuplexStream { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { 0 } } @@ -204,15 +209,27 @@ mod ext_io_impl { } } +#[cfg(unix)] pub(crate) trait ConnFdReusable { fn check_fd_match(&self, fd: V) -> bool; } +#[cfg(windows)] +pub(crate) trait ConnSockReusable { + fn check_sock_match(&self, sock: V) -> bool; +} + use l4::socket::SocketAddr; use log::{debug, error}; +#[cfg(unix)] use nix::sys::socket::{getpeername, SockaddrStorage, UnixAddr}; -use std::{net::SocketAddr as InetSocketAddr, os::unix::prelude::AsRawFd, path::Path}; +#[cfg(unix)] +use std::os::unix::prelude::AsRawFd; +#[cfg(windows)] +use std::os::windows::io::AsRawSocket; +use std::{net::SocketAddr as InetSocketAddr, path::Path}; +#[cfg(unix)] impl ConnFdReusable for SocketAddr { fn check_fd_match(&self, fd: V) -> bool { match self { @@ -225,6 +242,16 @@ impl ConnFdReusable for SocketAddr { } } +#[cfg(windows)] +impl ConnSockReusable for SocketAddr { + fn check_sock_match(&self, sock: V) -> bool { + match self { + SocketAddr::Inet(addr) => addr.check_sock_match(sock), + } + } +} + +#[cfg(unix)] impl ConnFdReusable for Path { fn check_fd_match(&self, fd: V) -> bool { let fd = fd.as_raw_fd(); @@ -252,6 +279,7 @@ impl ConnFdReusable for Path { } } +#[cfg(unix)] impl ConnFdReusable for InetSocketAddr { fn check_fd_match(&self, fd: V) -> bool { let fd = fd.as_raw_fd(); diff --git a/pingora-core/src/protocols/tls/mod.rs b/pingora-core/src/protocols/tls/mod.rs index ca353c5b..89fe0d38 100644 --- a/pingora-core/src/protocols/tls/mod.rs +++ b/pingora-core/src/protocols/tls/mod.rs @@ -26,7 +26,7 @@ pub use boringssl_openssl::*; pub mod dummy_tls; use crate::protocols::digest::TimingDigest; -use crate::protocols::{Ssl, UniqueID}; +use crate::protocols::{Ssl, UniqueID, UniqueIDType}; use crate::tls::{self, ssl, tokio_ssl::SslStream as InnerSsl}; use log::warn; use pingora_error::{ErrorType::*, OrErr, Result}; @@ -169,7 +169,7 @@ impl UniqueID for SslStream where T: UniqueID, { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { self.ssl.get_ref().id() } } diff --git a/pingora-proxy/examples/gateway.rs b/pingora-proxy/examples/gateway.rs index 0bd53306..78f4aae9 100644 --- a/pingora-proxy/examples/gateway.rs +++ b/pingora-proxy/examples/gateway.rs @@ -121,7 +121,7 @@ fn main() { let mut my_proxy = pingora_proxy::http_proxy_service( &my_server.configuration, MyGateway { - req_metric: register_int_counter!("reg_counter", "Number of requests").unwrap(), + req_metric: register_int_counter!("req_counter", "Number of requests").unwrap(), }, ); my_proxy.add_tcp("0.0.0.0:6191"); diff --git a/pingora-proxy/src/proxy_trait.rs b/pingora-proxy/src/proxy_trait.rs index 2d99567e..029ea58f 100644 --- a/pingora-proxy/src/proxy_trait.rs +++ b/pingora-proxy/src/proxy_trait.rs @@ -69,9 +69,9 @@ pub trait ProxyHttp { /// Handle the incoming request before any downstream module is executed. /// - /// This function is similar to [Self::request_filter()] but execute before any other logic - /// especially the downstream modules. The main purpose of this function is to provide finer - /// grained control of behavior of the modules. + /// This function is similar to [Self::request_filter()] but executes before any other logic, + /// including downstream module logic. The main purpose of this function is to provide finer + /// grained control of the behavior of the modules. /// /// Note that because this function is executed before any module that might provide access /// control or rate limiting, logic should stay in request_filter() if it can in order to be diff --git a/pingora-proxy/tests/test_upstream.rs b/pingora-proxy/tests/test_upstream.rs index 02174c10..42319a39 100644 --- a/pingora-proxy/tests/test_upstream.rs +++ b/pingora-proxy/tests/test_upstream.rs @@ -1516,6 +1516,43 @@ mod test_cache { task3.await.unwrap(); } + #[tokio::test] + async fn test_cache_streaming_multiple_writers() { + // multiple streaming writers don't conflict + init(); + let url = "http://127.0.0.1:6148/slow_body/test_cache_streaming_multiple_writers.txt"; + let task1 = tokio::spawn(async move { + let res = reqwest::Client::new() + .get(url) + .header("x-set-hello", "everyone") + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + let headers = res.headers(); + assert_eq!(headers["x-cache-status"], "miss"); + assert_eq!(res.text().await.unwrap(), "hello everyone!"); + }); + + let task2 = tokio::spawn(async move { + let res = reqwest::Client::new() + .get(url) + // don't allow using the other streaming write's result + .header("x-force-expire", "1") + .header("x-set-hello", "todo el mundo") + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + let headers = res.headers(); + assert_eq!(headers["x-cache-status"], "miss"); + assert_eq!(res.text().await.unwrap(), "hello todo el mundo!"); + }); + + task1.await.unwrap(); + task2.await.unwrap(); + } + #[tokio::test] async fn test_range_request() { init(); diff --git a/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf b/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf index a41a743d..6d5abd73 100644 --- a/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf +++ b/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf @@ -408,12 +408,13 @@ http { location /slow_body { content_by_lua_block { local sleep_sec = tonumber(ngx.var.http_x_set_sleep) or 1 + local hello_to = ngx.var.http_x_set_hello or "world" ngx.flush() ngx.sleep(sleep_sec) ngx.print("hello ") ngx.flush() ngx.sleep(sleep_sec) - ngx.print("world") + ngx.print(hello_to) ngx.sleep(sleep_sec) ngx.print("!") } diff --git a/pingora-proxy/tests/utils/server_utils.rs b/pingora-proxy/tests/utils/server_utils.rs index 7db26229..f90a27e9 100644 --- a/pingora-proxy/tests/utils/server_utils.rs +++ b/pingora-proxy/tests/utils/server_utils.rs @@ -399,6 +399,19 @@ impl ProxyHttp for ExampleProxyCache { Ok(()) } + async fn cache_hit_filter( + &self, + session: &Session, + _meta: &CacheMeta, + _ctx: &mut Self::CTX, + ) -> Result { + // allow test header to control force expiry + if session.get_header_bytes("x-force-expire") != b"" { + return Ok(true); + } + Ok(false) + } + fn cache_vary_filter( &self, meta: &CacheMeta,