Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
82f4584
Extract OidcAuthenticator trait from Frontegg Authenticator
SangJunBak Jan 12, 2026
424f438
Extract password retrieval into own method
SangJunBak Jan 14, 2026
fdb2c9a
Implement OIDC prototype
SangJunBak Jan 15, 2026
8c1a6f9
Refactor internal_user_metadata out of session clients
SangJunBak Jan 20, 2026
4e64ba3
Add OIDC mock server for testing
SangJunBak Jan 21, 2026
7aa7db6
Implement OIDC option extraction
SangJunBak Jan 20, 2026
ffadefd
Fix lint errors
SangJunBak Jan 21, 2026
9e19191
Fetch jwks uri from openid-configuration endpoint
SangJunBak Jan 22, 2026
92eb3d9
Annotate mz imports with default-features
SangJunBak Jan 22, 2026
d0a1751
Refactor OIDC issuer URL documentation and improve password handling …
SangJunBak Jan 22, 2026
021de64
Add OIDC audience validation support
SangJunBak Jan 21, 2026
1b04b15
Combine mz-authenticator-types into mz-auth
SangJunBak Jan 26, 2026
ab4c25f
Remove common OIDC authenticator
SangJunBak Jan 26, 2026
436dd3f
Introduce sentinel type for authentication
SangJunBak Jan 26, 2026
6794a1a
Wrap superuser attribute in a custom struct
SangJunBak Jan 27, 2026
e407342
Extend OIDC traits and enum for password fallback
SangJunBak Jan 22, 2026
7cf6b0a
Add password fallback for OIDC pgwire
SangJunBak Jan 22, 2026
eae81b4
Add test_auth_oidc_password_fallback test
SangJunBak Jan 26, 2026
c04b2aa
Decrease unions in recursion limit test
SangJunBak Jan 27, 2026
14ef2d8
Update OIDC tests to use system parameter defaults as config
SangJunBak Jan 27, 2026
4effde6
Add an OIDC test where OIDC config can change
SangJunBak Jan 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ members = [
"src/mz",
"src/mz-debug",
"src/npm",
"src/oidc-mock",
"src/orchestrator",
"src/orchestrator-kubernetes",
"src/orchestrator-process",
Expand Down Expand Up @@ -182,6 +183,7 @@ default-members = [
"src/mz",
"src/mz-debug",
"src/npm",
"src/oidc-mock",
"src/orchestrator",
"src/orchestrator-kubernetes",
"src/orchestrator-process",
Expand Down
16 changes: 5 additions & 11 deletions doc/developer/design/20251215_oidc_authentication.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,6 @@ spec:
# Note: Not all JWT providers support .well-known/openid-configuration,
# so use jwks directly if the provider doesn't support it.
jwksFetchFromIssuer: true
# The OAuth 2.0 token endpoint where Materialize will request new access
# tokens using a refresh token (https://www.rfc-editor.org/rfc/rfc6749.html#section-6).
# Requires `grant_type` of `refresh_token` in the client.
# Optional. If not provided, sessions will expire when the access token expires
# and refresh tokens in the password field will be ignored.
tokenEndpoint: https://dev-123456.okta.com/oauth2/default/v1/token
```

Where in environmentd, it’ll look like so:
Expand All @@ -82,9 +76,13 @@ When a user first logs in with a valid token, we create a role for them if one d

### Solution proposal: The user should be disabled from logging in when a user is removed from the upstream IDP. However, the database level role should still exist.

When doing pgwire Oidc authentication, we can accept a cleartext password that is the access token. The OIDC authenticator will do JWT authentication on the access token. If the token is expired, the session will not be established. We will not do any invalidation on the session if the session has already been authenticated/established, but the token is expired. Eventually, the token will expire and the user will not be able to authenticate a new session. This creates a tradeoff between security and developer experience, but is acceptable since organizations will have supplemental methods of deprovisioning users outside of the database. This accomplishes disabling a user from logging in, but the database role still existing.

**Alternative: Use a refresh token flow to invalidate active sessions**

When doing pgwire Oidc authentication, we can accept a cleartext password of the form `access=<ACCESS_TOKEN>&refresh=<REFRESH_TOKEN>` where `&` is a delimiter and `refresh=<REFRESH_TOKEN>` is optional. The OIDC authenticator will then try to authenticate again and fetch a new access token using the refresh token when close to expiration (using the token API URL in the spec above). If the refresh token doesn’t exist, the session will invalidate. This would require users to have their IdP client generate `refresh` tokens. For token expiration checking, in a task, we'll repeatedly wait for `(expiration - now) * 0.8` and see if it's less than a minute. This is also how we check token expiration in the Frontegg authenticator. We'll also implement a config variable to turn off this mechanism and have it default to true.

By suggesting a short time to live for access tokens, this accomplishes invalidating sessions on removal of a user from an IDP. When admins remove a user from an IDP, the next time the user tries to authenticate or refresh their access token, the token API will not allow the user to login but will keep the role in the database.
This approach would enhance security by ensuring that sessions are invalidated once the access token expires. However, it would also introduce additional complexity and degrade the developer experience, as it would require users to configure refresh tokens in their IdP. Additionally, some IdPs may impose rate limits on token refresh operations. By opting for a simpler design, we minimize potential incompatibilities with various IdPs.

**Alternative: Use SASL Authentication using the OAUTHBEARER mechanism rather than a cleartext password**

Expand Down Expand Up @@ -124,12 +122,8 @@ An MVP of what this might look like exists here: [https://github.com/Materialize
### Tests:

- Successful login (e2e mzcompose)
- Invalidating the session on access token expiration and no refresh token (Rust unit test)
- A token should successfully refresh if the access token and refresh token are valid (Rust unit test)
- Session should error if access token is invalid (Rust unit test)
- Session should error if refresh token is invalid (Rust unit test)
- A user shouldn't be able to login as another user (Rust unit test)
- Removing a user from the upstream IDP should invalidate the refresh token (e2e mzcompose)
- Platform-check simple login check (platform-check framework)
- JWTs should only be accepted when a valid JWK is set (we do not want to accept JWTs that are not signed with a real, cryptographically sound key)

Expand Down
2 changes: 2 additions & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,8 @@ def get_default_system_parameters(
"kafka_retry_backoff_max",
"kafka_reconnect_backoff",
"kafka_reconnect_backoff_max",
"oidc_issuer",
"oidc_audience",
]


Expand Down
2 changes: 2 additions & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1716,6 +1716,8 @@ def __init__(
"kafka_reconnect_backoff_max",
"pg_source_validate_timeline",
"sql_server_source_validate_restore_history",
"oidc_issuer",
"oidc_audience",
]

def run(self, exe: Executor) -> bool:
Expand Down
12 changes: 12 additions & 0 deletions src/adapter-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ pub const ENABLE_PASSWORD_AUTH: Config<bool> = Config::new(
"Enable password authentication.",
);

/// OIDC issuer URL.
pub const OIDC_ISSUER: Config<&'static str> = Config::new("oidc_issuer", "", "OIDC issuer URL.");

/// OIDC audience (client ID). When empty, audience validation is skipped.
pub const OIDC_AUDIENCE: Config<&'static str> = Config::new(
"oidc_audience",
"",
"OIDC audience (client ID). When empty, audience validation is skipped.",
);

pub const CONSTRAINT_BASED_TIMESTAMP_SELECTION: Config<&'static str> = Config::new(
"constraint_based_timestamp_selection",
ConstraintBasedTimestampSelection::const_default().as_str(),
Expand Down Expand Up @@ -157,6 +167,8 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&ENABLE_EXPRESSION_CACHE)
.add(&ENABLE_MULTI_REPLICA_SOURCES)
.add(&ENABLE_PASSWORD_AUTH)
.add(&OIDC_ISSUER)
.add(&OIDC_AUDIENCE)
.add(&CONSTRAINT_BASED_TIMESTAMP_SELECTION)
.add(&PERSIST_FAST_PATH_ORDER)
.add(&ENABLE_S3_TABLES_REGION_CHECK)
Expand Down
51 changes: 33 additions & 18 deletions src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use derivative::Derivative;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
use mz_auth::Authenticated;
use mz_auth::password::Password;
use mz_build_info::BuildInfo;
use mz_compute_types::ComputeInstanceId;
Expand All @@ -33,6 +34,7 @@ use mz_ore::result::ResultExt;
use mz_ore::task::AbortOnDropHandle;
use mz_ore::thread::JoinOnDropHandle;
use mz_ore::tracing::OpenTelemetryContext;
use mz_repr::user::InternalUserMetadata;
use mz_repr::{CatalogItemId, ColumnIndex, Row, SqlScalarType};
use mz_sql::ast::{Raw, Statement};
use mz_sql::catalog::{EnvironmentId, SessionCatalog};
Expand All @@ -51,8 +53,8 @@ use uuid::Uuid;

use crate::catalog::Catalog;
use crate::command::{
AuthResponse, CatalogDump, CatalogSnapshot, Command, ExecuteResponse, Response,
SASLChallengeResponse, SASLVerifyProofResponse,
CatalogDump, CatalogSnapshot, Command, ExecuteResponse, Response, SASLChallengeResponse,
SASLVerifyProofResponse, SuperuserAttribute,
};
use crate::coord::{Coordinator, ExecuteContextGuard};
use crate::error::AdapterError;
Expand Down Expand Up @@ -148,27 +150,30 @@ impl Client {
/// Creates a new session associated with this client for the given user.
///
/// It is the caller's responsibility to have authenticated the user.
pub fn new_session(&self, config: SessionConfig) -> Session {
/// We pass in an Authenticated marker as a guardrail to ensure the
/// user has authenticated with an authenticator before creating a session.
pub fn new_session(&self, config: SessionConfig, _authenticated: Authenticated) -> Session {
// We use the system clock to determine when a session connected to Materialize. This is not
// intended to be 100% accurate and correct, so we don't burden the timestamp oracle with
// generating a more correct timestamp.
Session::new(self.build_info, config, self.metrics().session_metrics())
}

/// Preforms an authentication check for the given user.
/// Verifies the provided user's password against the
/// stored credentials in the catalog.
pub async fn authenticate(
&self,
user: &String,
password: &Password,
) -> Result<AuthResponse, AdapterError> {
) -> Result<Authenticated, AdapterError> {
let (tx, rx) = oneshot::channel();
self.send(Command::AuthenticatePassword {
role_name: user.to_string(),
password: Some(password.clone()),
tx,
});
let response = rx.await.expect("sender dropped")?;
Ok(response)
rx.await.expect("sender dropped")?;
Ok(Authenticated)
}

pub async fn generate_sasl_challenge(
Expand All @@ -192,7 +197,7 @@ impl Client {
proof: &String,
nonce: &String,
mock_hash: &String,
) -> Result<SASLVerifyProofResponse, AdapterError> {
) -> Result<(SASLVerifyProofResponse, Authenticated), AdapterError> {
let (tx, rx) = oneshot::channel();
self.send(Command::AuthenticateVerifySASLProof {
role_name: user.to_string(),
Expand All @@ -202,7 +207,7 @@ impl Client {
tx,
});
let response = rx.await.expect("sender dropped")?;
Ok(response)
Ok((response, Authenticated))
}

/// Upgrades this client to a session client.
Expand Down Expand Up @@ -265,6 +270,7 @@ impl Client {
optimizer_metrics,
persist_client,
statement_logging_frontend,
superuser_attribute,
} = response;

let peek_client = PeekClient::new(
Expand All @@ -287,6 +293,13 @@ impl Client {
};

let session = client.session();

// Apply the superuser attribute to the session's user if
// it exists.
if let SuperuserAttribute(Some(superuser)) = superuser_attribute {
session.apply_internal_user_metadata(InternalUserMetadata { superuser });
}

session.initialize_role_metadata(role_id);
let vars_mut = session.vars_mut();
for (name, val) in session_defaults {
Expand Down Expand Up @@ -438,15 +451,17 @@ Issue a SQL query to get started. Need help?
) -> Result<Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send>>, anyhow::Error> {
// Connect to the coordinator.
let conn_id = self.new_conn_id()?;
let session = self.new_session(SessionConfig {
conn_id,
uuid: Uuid::new_v4(),
user: SUPPORT_USER.name.clone(),
client_ip: None,
external_metadata_rx: None,
internal_user_metadata: None,
helm_chart_version: None,
});
let session = self.new_session(
SessionConfig {
conn_id,
uuid: Uuid::new_v4(),
user: SUPPORT_USER.name.clone(),
client_ip: None,
external_metadata_rx: None,
helm_chart_version: None,
},
Authenticated,
);
let mut session_client = self.startup(session).await?;

// Parse the SQL statement.
Expand Down
21 changes: 9 additions & 12 deletions src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub enum Command {
},

AuthenticatePassword {
tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
tx: oneshot::Sender<Result<(), AdapterError>>,
role_name: String,
password: Option<Password>,
},
Expand Down Expand Up @@ -372,12 +372,20 @@ pub struct Response<T> {
pub otel_ctx: OpenTelemetryContext,
}

#[derive(Debug, Clone, Copy)]
pub struct SuperuserAttribute(pub Option<bool>);

/// The response to [`Client::startup`](crate::Client::startup).
#[derive(Derivative)]
#[derivative(Debug)]
pub struct StartupResponse {
/// RoleId for the user.
pub role_id: RoleId,
/// The role's superuser attribute in the Catalog.
/// This attribute is None for Cloud. Cloud is able
/// to derive the role's superuser status from
/// external_metadata_rx.
pub superuser_attribute: SuperuserAttribute,
/// A future that completes when all necessary Builtin Table writes have completed.
#[derivative(Debug = "ignore")]
pub write_notify: BuiltinTableAppendNotify,
Expand All @@ -396,16 +404,6 @@ pub struct StartupResponse {
pub statement_logging_frontend: StatementLoggingFrontend,
}

/// The response to [`Client::authenticate`](crate::Client::authenticate).
#[derive(Derivative)]
#[derivative(Debug)]
pub struct AuthResponse {
/// RoleId for the user.
pub role_id: RoleId,
/// If the user is a superuser.
pub superuser: bool,
}

#[derive(Derivative)]
#[derivative(Debug)]
pub struct SASLChallengeResponse {
Expand All @@ -419,7 +417,6 @@ pub struct SASLChallengeResponse {
#[derivative(Debug)]
pub struct SASLVerifyProofResponse {
pub verifier: String,
pub auth_resp: AuthResponse,
}

// Facile implementation for `StartupResponse`, which does not use the `allowed`
Expand Down
21 changes: 12 additions & 9 deletions src/adapter/src/config/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use std::collections::BTreeMap;

use mz_auth::Authenticated;
use mz_sql::session::user::SYSTEM_USER;
use tracing::{error, info};
use uuid::Uuid;
Expand All @@ -28,15 +29,17 @@ pub struct SystemParameterBackend {
impl SystemParameterBackend {
pub async fn new(client: Client) -> Result<Self, AdapterError> {
let conn_id = client.new_conn_id()?;
let session = client.new_session(SessionConfig {
conn_id,
uuid: Uuid::new_v4(),
user: SYSTEM_USER.name.clone(),
client_ip: None,
external_metadata_rx: None,
internal_user_metadata: None,
helm_chart_version: None,
});
let session = client.new_session(
SessionConfig {
conn_id,
uuid: Uuid::new_v4(),
user: SYSTEM_USER.name.clone(),
client_ip: None,
external_metadata_rx: None,
helm_chart_version: None,
},
Authenticated,
);
let session_client = client.startup(session).await?;
Ok(Self { session_client })
}
Expand Down
Loading