Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ members = [
"src/mz",
"src/mz-debug",
"src/npm",
"src/oidc-mock",
"src/orchestrator",
"src/orchestrator-kubernetes",
"src/orchestrator-process",
Expand Down Expand Up @@ -182,6 +183,7 @@ default-members = [
"src/mz",
"src/mz-debug",
"src/npm",
"src/oidc-mock",
"src/orchestrator",
"src/orchestrator-kubernetes",
"src/orchestrator-process",
Expand Down
16 changes: 5 additions & 11 deletions doc/developer/design/20251215_oidc_authentication.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,6 @@ spec:
# Note: Not all JWT providers support .well-known/openid-configuration,
# so use jwks directly if the provider doesn't support it.
jwksFetchFromIssuer: true
# The OAuth 2.0 token endpoint where Materialize will request new access
# tokens using a refresh token (https://www.rfc-editor.org/rfc/rfc6749.html#section-6).
# Requires `grant_type` of `refresh_token` in the client.
# Optional. If not provided, sessions will expire when the access token expires
# and refresh tokens in the password field will be ignored.
tokenEndpoint: https://dev-123456.okta.com/oauth2/default/v1/token
```
Where in environmentd, it’ll look like so:
Expand All @@ -82,9 +76,13 @@ When a user first logs in with a valid token, we create a role for them if one d

### Solution proposal: The user should be disabled from logging in when a user is removed from the upstream IDP. However, the database level role should still exist.

When doing pgwire Oidc authentication, we can accept a cleartext password that is the access token. The OIDC authenticator will do JWT authentication on the access token. If the token is expired, the session will not be established. We will not do any invalidation on the session if the session has already been authenticated/established, but the token is expired. Eventually, the token will expire and the user will not be able to authenticate a new session. This creates a tradeoff between security and developer experience, but is acceptable since organizations will have supplemental methods of deprovisioning users outside of the database. This accomplishes disabling a user from logging in, but the database role still existing.

**Alternative: Use a refresh token flow to invalidate active sessions**

When doing pgwire Oidc authentication, we can accept a cleartext password of the form `access=<ACCESS_TOKEN>&refresh=<REFRESH_TOKEN>` where `&` is a delimiter and `refresh=<REFRESH_TOKEN>` is optional. The OIDC authenticator will then try to authenticate again and fetch a new access token using the refresh token when close to expiration (using the token API URL in the spec above). If the refresh token doesn’t exist, the session will invalidate. This would require users to have their IdP client generate `refresh` tokens. For token expiration checking, in a task, we'll repeatedly wait for `(expiration - now) * 0.8` and see if it's less than a minute. This is also how we check token expiration in the Frontegg authenticator. We'll also implement a config variable to turn off this mechanism and have it default to true.

By suggesting a short time to live for access tokens, this accomplishes invalidating sessions on removal of a user from an IDP. When admins remove a user from an IDP, the next time the user tries to authenticate or refresh their access token, the token API will not allow the user to login but will keep the role in the database.
This approach would enhance security by ensuring that sessions are invalidated once the access token expires. However, it would also introduce additional complexity and degrade the developer experience, as it would require users to configure refresh tokens in their IdP. Additionally, some IdPs may impose rate limits on token refresh operations. By opting for a simpler design, we minimize potential incompatibilities with various IdPs.

**Alternative: Use SASL Authentication using the OAUTHBEARER mechanism rather than a cleartext password**

Expand Down Expand Up @@ -124,12 +122,8 @@ An MVP of what this might look like exists here: [https://github.com/Materialize
### Tests:

- Successful login (e2e mzcompose)
- Invalidating the session on access token expiration and no refresh token (Rust unit test)
- A token should successfully refresh if the access token and refresh token are valid (Rust unit test)
- Session should error if access token is invalid (Rust unit test)
- Session should error if refresh token is invalid (Rust unit test)
- A user shouldn't be able to login as another user (Rust unit test)
- Removing a user from the upstream IDP should invalidate the refresh token (e2e mzcompose)
- Platform-check simple login check (platform-check framework)
- JWTs should only be accepted when a valid JWK is set (we do not want to accept JWTs that are not signed with a real, cryptographically sound key)

Expand Down
51 changes: 33 additions & 18 deletions src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use derivative::Derivative;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
use mz_auth::Authenticated;
use mz_auth::password::Password;
use mz_build_info::BuildInfo;
use mz_compute_types::ComputeInstanceId;
Expand All @@ -33,6 +34,7 @@ use mz_ore::result::ResultExt;
use mz_ore::task::AbortOnDropHandle;
use mz_ore::thread::JoinOnDropHandle;
use mz_ore::tracing::OpenTelemetryContext;
use mz_repr::user::InternalUserMetadata;
use mz_repr::{CatalogItemId, ColumnIndex, Row, SqlScalarType};
use mz_sql::ast::{Raw, Statement};
use mz_sql::catalog::{EnvironmentId, SessionCatalog};
Expand All @@ -51,8 +53,8 @@ use uuid::Uuid;

use crate::catalog::Catalog;
use crate::command::{
AuthResponse, CatalogDump, CatalogSnapshot, Command, ExecuteResponse, Response,
SASLChallengeResponse, SASLVerifyProofResponse,
CatalogDump, CatalogSnapshot, Command, ExecuteResponse, Response, SASLChallengeResponse,
SASLVerifyProofResponse, SuperuserAttribute,
};
use crate::coord::{Coordinator, ExecuteContextGuard};
use crate::error::AdapterError;
Expand Down Expand Up @@ -148,27 +150,30 @@ impl Client {
/// Creates a new session associated with this client for the given user.
///
/// It is the caller's responsibility to have authenticated the user.
pub fn new_session(&self, config: SessionConfig) -> Session {
/// We pass in an Authenticated marker as a guardrail to ensure the
/// user has authenticated with an authenticator before creating a session.
pub fn new_session(&self, config: SessionConfig, _authenticated: Authenticated) -> Session {
// We use the system clock to determine when a session connected to Materialize. This is not
// intended to be 100% accurate and correct, so we don't burden the timestamp oracle with
// generating a more correct timestamp.
Session::new(self.build_info, config, self.metrics().session_metrics())
}

/// Preforms an authentication check for the given user.
/// Verifies the provided user's password against the
/// stored credentials in the catalog.
pub async fn authenticate(
&self,
user: &String,
password: &Password,
) -> Result<AuthResponse, AdapterError> {
) -> Result<Authenticated, AdapterError> {
let (tx, rx) = oneshot::channel();
self.send(Command::AuthenticatePassword {
role_name: user.to_string(),
password: Some(password.clone()),
tx,
});
let response = rx.await.expect("sender dropped")?;
Ok(response)
rx.await.expect("sender dropped")?;
Ok(Authenticated)
}

pub async fn generate_sasl_challenge(
Expand All @@ -192,7 +197,7 @@ impl Client {
proof: &String,
nonce: &String,
mock_hash: &String,
) -> Result<SASLVerifyProofResponse, AdapterError> {
) -> Result<(SASLVerifyProofResponse, Authenticated), AdapterError> {
let (tx, rx) = oneshot::channel();
self.send(Command::AuthenticateVerifySASLProof {
role_name: user.to_string(),
Expand All @@ -202,7 +207,7 @@ impl Client {
tx,
});
let response = rx.await.expect("sender dropped")?;
Ok(response)
Ok((response, Authenticated))
}

/// Upgrades this client to a session client.
Expand Down Expand Up @@ -265,6 +270,7 @@ impl Client {
optimizer_metrics,
persist_client,
statement_logging_frontend,
superuser_attribute,
} = response;

let peek_client = PeekClient::new(
Expand All @@ -287,6 +293,13 @@ impl Client {
};

let session = client.session();

// Apply the superuser attribute to the session's user if
// it exists.
if let SuperuserAttribute(Some(superuser)) = superuser_attribute {
session.apply_internal_user_metadata(InternalUserMetadata { superuser });
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea how the internal user metadata works, or why we were doing the round trip with the catalog for it. My review will not cover whether this is correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can ask someone on sql team for eyes on this commit specifically!

Copy link
Contributor Author

@SangJunBak SangJunBak Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asked dov to review this commit!


session.initialize_role_metadata(role_id);
let vars_mut = session.vars_mut();
for (name, val) in session_defaults {
Expand Down Expand Up @@ -438,15 +451,17 @@ Issue a SQL query to get started. Need help?
) -> Result<Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send>>, anyhow::Error> {
// Connect to the coordinator.
let conn_id = self.new_conn_id()?;
let session = self.new_session(SessionConfig {
conn_id,
uuid: Uuid::new_v4(),
user: SUPPORT_USER.name.clone(),
client_ip: None,
external_metadata_rx: None,
internal_user_metadata: None,
helm_chart_version: None,
});
let session = self.new_session(
SessionConfig {
conn_id,
uuid: Uuid::new_v4(),
user: SUPPORT_USER.name.clone(),
client_ip: None,
external_metadata_rx: None,
helm_chart_version: None,
},
Authenticated,
);
let mut session_client = self.startup(session).await?;

// Parse the SQL statement.
Expand Down
21 changes: 9 additions & 12 deletions src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub enum Command {
},

AuthenticatePassword {
tx: oneshot::Sender<Result<AuthResponse, AdapterError>>,
tx: oneshot::Sender<Result<(), AdapterError>>,
role_name: String,
password: Option<Password>,
},
Expand Down Expand Up @@ -372,12 +372,20 @@ pub struct Response<T> {
pub otel_ctx: OpenTelemetryContext,
}

#[derive(Debug, Clone, Copy)]
pub struct SuperuserAttribute(pub Option<bool>);

/// The response to [`Client::startup`](crate::Client::startup).
#[derive(Derivative)]
#[derivative(Debug)]
pub struct StartupResponse {
/// RoleId for the user.
pub role_id: RoleId,
/// The role's superuser attribute in the Catalog.
/// This attribute is None for Cloud. Cloud is able
/// to derive the role's superuser status from
/// external_metadata_rx.
pub superuser_attribute: SuperuserAttribute,
/// A future that completes when all necessary Builtin Table writes have completed.
#[derivative(Debug = "ignore")]
pub write_notify: BuiltinTableAppendNotify,
Expand All @@ -396,16 +404,6 @@ pub struct StartupResponse {
pub statement_logging_frontend: StatementLoggingFrontend,
}

/// The response to [`Client::authenticate`](crate::Client::authenticate).
#[derive(Derivative)]
#[derivative(Debug)]
pub struct AuthResponse {
/// RoleId for the user.
pub role_id: RoleId,
/// If the user is a superuser.
pub superuser: bool,
}

#[derive(Derivative)]
#[derivative(Debug)]
pub struct SASLChallengeResponse {
Expand All @@ -419,7 +417,6 @@ pub struct SASLChallengeResponse {
#[derivative(Debug)]
pub struct SASLVerifyProofResponse {
pub verifier: String,
pub auth_resp: AuthResponse,
}

// Facile implementation for `StartupResponse`, which does not use the `allowed`
Expand Down
21 changes: 12 additions & 9 deletions src/adapter/src/config/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use std::collections::BTreeMap;

use mz_auth::Authenticated;
use mz_sql::session::user::SYSTEM_USER;
use tracing::{error, info};
use uuid::Uuid;
Expand All @@ -28,15 +29,17 @@ pub struct SystemParameterBackend {
impl SystemParameterBackend {
pub async fn new(client: Client) -> Result<Self, AdapterError> {
let conn_id = client.new_conn_id()?;
let session = client.new_session(SessionConfig {
conn_id,
uuid: Uuid::new_v4(),
user: SYSTEM_USER.name.clone(),
client_ip: None,
external_metadata_rx: None,
internal_user_metadata: None,
helm_chart_version: None,
});
let session = client.new_session(
SessionConfig {
conn_id,
uuid: Uuid::new_v4(),
user: SYSTEM_USER.name.clone(),
client_ip: None,
external_metadata_rx: None,
helm_chart_version: None,
},
Authenticated,
);
let session_client = client.startup(session).await?;
Ok(Self { session_client })
}
Expand Down
Loading