Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
aadd07a5b3064b0fbdf57c8c02a5ef7b65b5fc03
28f94f2a402bbf66341bdac8fa670caf5b7311e9
13 changes: 10 additions & 3 deletions pingora-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ http = { workspace = true }
log = { workspace = true }
h2 = { workspace = true }
lru = { workspace = true }
nix = "~0.24.3"
clap = { version = "3.2.25", features = ["derive"] }
once_cell = { workspace = true }
serde = { version = "1.0", features = ["derive"] }
Expand All @@ -46,7 +45,6 @@ libc = "0.2.70"
chrono = { version = "~0.4.31", features = ["alloc"], default-features = false }
thread_local = "1.0"
prometheus = "0.13"
daemonize = "0.5.0"
sentry = { version = "0.26", features = [
"backtrace",
"contexts",
Expand All @@ -69,12 +67,21 @@ tokio-test = "0.4"
zstd = "0"
httpdate = "1"

[target.'cfg(unix)'.dependencies]
daemonize = "0.5.0"
nix = "~0.24.3"

[target.'cfg(windows)'.dependencies]
windows-sys = { version = "0.59.0", features = ["Win32_Networking_WinSock"] }

[dev-dependencies]
matches = "0.1"
env_logger = "0.9"
reqwest = { version = "0.11", features = ["rustls"], default-features = false }
hyperlocal = "0.8"
hyper = "0.14"

[target.'cfg(unix)'.dev-dependencies]
hyperlocal = "0.8"
jemallocator = "0.5"

[features]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use log::debug;
use pingora_error::{Error, ErrorType::*, OrErr, Result};
use std::sync::{Arc, Once};

use super::ConnectorOptions;
use crate::connectors::ConnectorOptions;
use crate::protocols::tls::client::handshake;
use crate::protocols::tls::SslStream;
use crate::protocols::IO;
Expand Down
5 changes: 5 additions & 0 deletions pingora-core/src/connectors/tls/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#[cfg(feature = "some_tls")]
mod boringssl_openssl;

#[cfg(feature = "some_tls")]
pub use boringssl_openssl::*;
5 changes: 5 additions & 0 deletions pingora-core/src/listeners/tls/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#[cfg(feature = "some_tls")]
mod boringssl_openssl;

#[cfg(feature = "some_tls")]
pub use boringssl_openssl::*;
2 changes: 1 addition & 1 deletion pingora-core/src/modules/http/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl ResponseCompressionBuilder {
impl HttpModuleBuilder for ResponseCompressionBuilder {
fn init(&self) -> Module {
Box::new(ResponseCompression(ResponseCompressionCtx::new(
self.level, false,
self.level, false, false,
)))
}

Expand Down
129 changes: 108 additions & 21 deletions pingora-core/src/protocols/http/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ enum CtxInner {
accept_encoding: Vec<Algorithm>,
encoding_levels: [u32; Algorithm::COUNT],
decompress_enable: [bool; Algorithm::COUNT],
preserve_etag: [bool; Algorithm::COUNT],
},
BodyPhase(Option<Box<dyn Encode + Send + Sync>>),
}
Expand All @@ -78,11 +79,14 @@ impl ResponseCompressionCtx {
/// Create a new [`ResponseCompressionCtx`] with the expected compression level. `0` will disable
/// the compression. The compression level is applied across all algorithms.
/// The `decompress_enable` flag will tell the ctx to decompress if needed.
pub fn new(compression_level: u32, decompress_enable: bool) -> Self {
/// The `preserve_etag` flag indicates whether the ctx should avoid modifying the etag,
/// which will otherwise be weakened if the flag is false and (de)compression is applied.
pub fn new(compression_level: u32, decompress_enable: bool, preserve_etag: bool) -> Self {
Self(CtxInner::HeaderPhase {
accept_encoding: Vec::new(),
encoding_levels: [compression_level; Algorithm::COUNT],
decompress_enable: [decompress_enable; Algorithm::COUNT],
preserve_etag: [preserve_etag; Algorithm::COUNT],
})
}

Expand Down Expand Up @@ -166,16 +170,38 @@ impl ResponseCompressionCtx {
}
}

/// Adjust preserve etag setting.
/// # Panic
/// This function will panic if it has already started encoding the response body.
pub fn adjust_preserve_etag(&mut self, enabled: bool) {
match &mut self.0 {
CtxInner::HeaderPhase { preserve_etag, .. } => {
*preserve_etag = [enabled; Algorithm::COUNT];
}
CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"),
}
}

/// Adjust preserve etag setting for a specific algorithm.
/// # Panic
/// This function will panic if it has already started encoding the response body.
pub fn adjust_algorithm_preserve_etag(&mut self, algorithm: Algorithm, enabled: bool) {
match &mut self.0 {
CtxInner::HeaderPhase { preserve_etag, .. } => {
preserve_etag[algorithm.index()] = enabled;
}
CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"),
}
}

/// Feed the request header into this ctx.
pub fn request_filter(&mut self, req: &RequestHeader) {
if !self.is_enabled() {
return;
}
match &mut self.0 {
CtxInner::HeaderPhase {
decompress_enable: _,
accept_encoding,
encoding_levels: _,
accept_encoding, ..
} => parse_accept_encoding(
req.headers.get(http::header::ACCEPT_ENCODING),
accept_encoding,
Expand All @@ -192,6 +218,7 @@ impl ResponseCompressionCtx {
match &self.0 {
CtxInner::HeaderPhase {
decompress_enable,
preserve_etag,
accept_encoding,
encoding_levels: levels,
} => {
Expand Down Expand Up @@ -221,15 +248,22 @@ impl ResponseCompressionCtx {
}

let action = decide_action(resp, accept_encoding);
let encoder = match action {
Action::Noop => None,
Action::Compress(algorithm) => algorithm.compressor(levels[algorithm.index()]),
let (encoder, preserve_etag) = match action {
Action::Noop => (None, false),
Action::Compress(algorithm) => {
let idx = algorithm.index();
(algorithm.compressor(levels[idx]), preserve_etag[idx])
}
Action::Decompress(algorithm) => {
algorithm.decompressor(decompress_enable[algorithm.index()])
let idx = algorithm.index();
(
algorithm.decompressor(decompress_enable[idx]),
preserve_etag[idx],
)
}
};
if encoder.is_some() {
adjust_response_header(resp, &action);
adjust_response_header(resp, &action, preserve_etag);
}
self.0 = CtxInner::BodyPhase(encoder);
}
Expand All @@ -242,11 +276,7 @@ impl ResponseCompressionCtx {
/// Return None if the compressed is not enabled
pub fn response_body_filter(&mut self, data: Option<&Bytes>, end: bool) -> Option<Bytes> {
match &mut self.0 {
CtxInner::HeaderPhase {
decompress_enable: _,
accept_encoding: _,
encoding_levels: _,
} => panic!("Wrong phase: HeaderPhase"),
CtxInner::HeaderPhase { .. } => panic!("Wrong phase: HeaderPhase"),
CtxInner::BodyPhase(compressor) => {
let result = compressor
.as_mut()
Expand Down Expand Up @@ -718,9 +748,9 @@ fn test_add_vary_header() {
);
}

fn adjust_response_header(resp: &mut ResponseHeader, action: &Action) {
fn adjust_response_header(resp: &mut ResponseHeader, action: &Action, preserve_etag: bool) {
use http::header::{
HeaderValue, ACCEPT_RANGES, CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING,
HeaderValue, ACCEPT_RANGES, CONTENT_ENCODING, CONTENT_LENGTH, ETAG, TRANSFER_ENCODING,
};

fn set_stream_headers(resp: &mut ResponseHeader) {
Expand All @@ -734,16 +764,49 @@ fn adjust_response_header(resp: &mut ResponseHeader, action: &Action) {
.unwrap();
}

fn weaken_or_clear_etag(resp: &mut ResponseHeader) {
// RFC9110: https://datatracker.ietf.org/doc/html/rfc9110#section-8.8.1
// "a validator is weak if it is shared by two or more representations
// of a given resource at the same time, unless those representations
// have identical representation data"
// Follow nginx gzip filter's example when changing content encoding:
// - if the ETag is not a valid strong ETag, clear it (i.e. does not start with `"`)
// - else, weaken it
if let Some(etag) = resp.headers.get(&ETAG) {
let etag_bytes = etag.as_bytes();
if etag_bytes.starts_with(b"W/") {
// this is already a weak ETag, noop
} else if etag_bytes.starts_with(b"\"") {
// strong ETag, weaken since we are changing the byte representation
let weakened_etag = HeaderValue::from_bytes(&[b"W/", etag_bytes].concat())
.expect("valid header value prefixed with \"W/\" should remain valid");
resp.insert_header(&ETAG, weakened_etag)
.expect("can insert weakened etag when etag was already valid");
} else {
// invalid strong ETag, just clear it
// https://datatracker.ietf.org/doc/html/rfc9110#section-8.8.3
// says the opaque-tag section needs to be a quoted string
resp.remove_header(&ETAG);
}
}
}

match action {
Action::Noop => { /* do nothing */ }
Action::Decompress(_) => {
resp.remove_header(&CONTENT_ENCODING);
set_stream_headers(resp)
set_stream_headers(resp);
if !preserve_etag {
weaken_or_clear_etag(resp);
}
}
Action::Compress(a) => {
resp.insert_header(&CONTENT_ENCODING, HeaderValue::from_static(a.as_str()))
.unwrap();
set_stream_headers(resp)
set_stream_headers(resp);
if !preserve_etag {
weaken_or_clear_etag(resp);
}
}
}
}
Expand All @@ -758,7 +821,8 @@ fn test_adjust_response_header() {
header.insert_header("content-length", "20").unwrap();
header.insert_header("content-encoding", "gzip").unwrap();
header.insert_header("accept-ranges", "bytes").unwrap();
adjust_response_header(&mut header, &Noop);
header.insert_header("etag", "\"abc123\"").unwrap();
adjust_response_header(&mut header, &Noop, false);
assert_eq!(
header.headers.get("content-encoding").unwrap().as_bytes(),
b"gzip"
Expand All @@ -767,27 +831,45 @@ fn test_adjust_response_header() {
header.headers.get("content-length").unwrap().as_bytes(),
b"20"
);
assert_eq!(
header.headers.get("etag").unwrap().as_bytes(),
b"\"abc123\""
);
assert!(header.headers.get("transfer-encoding").is_none());

// decompress gzip
let mut header = ResponseHeader::build(200, None).unwrap();
header.insert_header("content-length", "20").unwrap();
header.insert_header("content-encoding", "gzip").unwrap();
header.insert_header("accept-ranges", "bytes").unwrap();
adjust_response_header(&mut header, &Decompress(Gzip));
header.insert_header("etag", "\"abc123\"").unwrap();
adjust_response_header(&mut header, &Decompress(Gzip), false);
assert!(header.headers.get("content-encoding").is_none());
assert!(header.headers.get("content-length").is_none());
assert_eq!(
header.headers.get("transfer-encoding").unwrap().as_bytes(),
b"chunked"
);
assert!(header.headers.get("accept-ranges").is_none());
assert_eq!(
header.headers.get("etag").unwrap().as_bytes(),
b"W/\"abc123\""
);
// when preserve_etag on, strong etag is kept
header.insert_header("etag", "\"abc123\"").unwrap();
adjust_response_header(&mut header, &Decompress(Gzip), true);
assert_eq!(
header.headers.get("etag").unwrap().as_bytes(),
b"\"abc123\""
);

// compress
let mut header = ResponseHeader::build(200, None).unwrap();
header.insert_header("content-length", "20").unwrap();
header.insert_header("accept-ranges", "bytes").unwrap();
adjust_response_header(&mut header, &Compress(Gzip));
// try invalid etag, should be cleared
header.insert_header("etag", "abc123").unwrap();
adjust_response_header(&mut header, &Compress(Gzip), false);
assert_eq!(
header.headers.get("content-encoding").unwrap().as_bytes(),
b"gzip"
Expand All @@ -798,4 +880,9 @@ fn test_adjust_response_header() {
header.headers.get("transfer-encoding").unwrap().as_bytes(),
b"chunked"
);
assert!(header.headers.get("etag").is_none());
// when preserve_etag on, etag is kept
header.insert_header("etag", "abc123").unwrap();
adjust_response_header(&mut header, &Compress(Gzip), true);
assert_eq!(header.headers.get("etag").unwrap().as_bytes(), b"abc123");
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

//! TLS client specific implementation

use super::SslStream;
use crate::protocols::raw_connect::ProxyDigest;
use crate::protocols::tls::SslStream;
use crate::protocols::{
GetProxyDigest, GetSocketDigest, GetTimingDigest, SocketDigest, TimingDigest, IO,
};
Expand Down
2 changes: 2 additions & 0 deletions pingora-core/src/protocols/tls/boringssl_openssl/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod client;
pub mod server;
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! TLS server specific implementation

use super::SslStream;
use crate::protocols::tls::SslStream;
use crate::protocols::{Shutdown, IO};
use crate::tls::ext;
use crate::tls::ext::ssl_from_acceptor;
Expand Down
8 changes: 6 additions & 2 deletions pingora-core/src/protocols/tls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@

//! The TLS layer implementations

pub mod client;
pub mod digest;
pub mod server;

#[cfg(feature = "some_tls")]
mod boringssl_openssl;

#[cfg(feature = "some_tls")]
pub use boringssl_openssl::*;

#[cfg(not(feature = "some_tls"))]
pub mod dummy_tls;
Expand Down
1 change: 1 addition & 0 deletions pingora-core/src/server/transfer_fd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ pub fn get_fds_from<P>(_path: &P, _payload: &mut [u8]) -> Result<(Vec<RawFd>, us
where
P: ?Sized + NixPath + std::fmt::Display,
{
log::error!("Upgrade is not currently supported outside of Linux platforms");
Err(Errno::ECONNREFUSED)
}

Expand Down
2 changes: 2 additions & 0 deletions pingora-core/tests/test_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod utils;

use hyper::Client;
#[cfg(unix)]
use hyperlocal::{UnixClientExt, Uri};
use utils::init;

Expand Down Expand Up @@ -51,6 +52,7 @@ async fn test_https_http2() {
assert_eq!(res.version(), reqwest::Version::HTTP_11);
}

#[cfg(unix)]
#[cfg(feature = "some_tls")]
#[tokio::test]
async fn test_uds() {
Expand Down
1 change: 1 addition & 0 deletions pingora-core/tests/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ fn entry_point(opt: Option<Opt>) {
my_server.bootstrap();

let mut listeners = Listeners::tcp("0.0.0.0:6145");
#[cfg(unix)]
listeners.add_uds("/tmp/echo.sock", None);

let mut tls_settings =
Expand Down
Loading