Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions engine/packages/api-peer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ rivet-api-util.workspace = true
rivet-config.workspace = true
rivet-error.workspace = true
rivet-pools.workspace = true
rivet-runner-protocol.workspace = true
rivet-util.workspace = true
rivet-types.workspace = true
serde.workspace = true
Expand Down
93 changes: 93 additions & 0 deletions engine/packages/api-peer/src/actors/kv_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use anyhow::*;
use base64::Engine;
use base64::prelude::BASE64_STANDARD;
use pegboard_actor_kv as actor_kv;
use rivet_api_builder::ApiCtx;
use rivet_runner_protocol::mk2 as rp;
use rivet_util::Id;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
pub struct KvListPath {
pub actor_id: Id,
}

#[derive(Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct KvListQuery {
/// Base64 encoded key prefix to filter by
pub prefix: Option<String>,
/// Number of results to return (default 100, max 1000)
pub limit: Option<usize>,
/// Whether to reverse the order
pub reverse: Option<bool>,
}

#[derive(Serialize, ToSchema)]
#[schema(as = ActorsKvListResponse)]
pub struct KvListResponse {
pub entries: Vec<KvEntry>,
}

#[derive(Serialize, ToSchema)]
pub struct KvEntry {
/// Key encoded in base64
pub key: String,
/// Value encoded in base64
pub value: String,
pub update_ts: i64,
}

#[utoipa::path(
get,
operation_id = "actors_kv_list",
path = "/actors/{actor_id}/kv/keys",
params(
("actor_id" = Id, Path),
("prefix" = Option<String>, Query, description = "Base64 encoded key prefix to filter by"),
("limit" = Option<usize>, Query, description = "Number of results to return (default 100, max 1000)"),
("reverse" = Option<bool>, Query, description = "Whether to reverse the order"),
),
responses(
(status = 200, body = KvListResponse),
),
)]
#[tracing::instrument(skip_all)]
pub async fn kv_list(ctx: ApiCtx, path: KvListPath, query: KvListQuery) -> Result<KvListResponse> {
// Parse query parameters
let limit = query.limit.unwrap_or(100).min(1000);
let reverse = query.reverse.unwrap_or(false);

// Build list query
let list_query = if let Some(prefix) = query.prefix {
let prefix_bytes = BASE64_STANDARD
.decode(&prefix)
.context("failed to decode base64 prefix")?;
rp::KvListQuery::KvListPrefixQuery(rp::KvListPrefixQuery {
key: prefix_bytes,
})
} else {
rp::KvListQuery::KvListAllQuery
};

// Get the KV entries
let udb = ctx.pools().udb()?;
let (keys, values, metadata) =
actor_kv::list(&*udb, path.actor_id, list_query, reverse, Some(limit)).await?;

// Build response
let entries = keys
.into_iter()
.zip(values.into_iter())
.zip(metadata.into_iter())
.map(|((key, value), meta)| KvEntry {
key: BASE64_STANDARD.encode(&key),
value: BASE64_STANDARD.encode(&value),
update_ts: meta.update_ts,
})
.collect();

Ok(KvListResponse { entries })
}
1 change: 1 addition & 0 deletions engine/packages/api-peer/src/actors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod create;
pub mod delete;
pub mod kv_get;
pub mod kv_list;
pub mod list;
pub mod list_names;
120 changes: 120 additions & 0 deletions engine/packages/api-public/src/actors/kv_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use anyhow::Result;
use axum::response::{IntoResponse, Response};
use rivet_api_builder::{
ApiError,
extract::{Extension, Path, Query},
};
use rivet_api_util::request_remote_datacenter_raw;
use rivet_util::Id;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

use crate::ctx::ApiCtx;

#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
pub struct KvListPath {
pub actor_id: Id,
}

#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct KvListQuery {
/// Base64 encoded key prefix to filter by
pub prefix: Option<String>,
/// Number of results to return (default 100, max 1000)
pub limit: Option<usize>,
/// Whether to reverse the order
pub reverse: Option<bool>,
}

#[derive(Serialize, ToSchema)]
#[schema(as = ActorsKvListResponse)]
pub struct KvListResponse {
pub entries: Vec<KvEntry>,
}

#[derive(Serialize, ToSchema)]
pub struct KvEntry {
/// Key encoded in base64
pub key: String,
/// Value encoded in base64
pub value: String,
pub update_ts: i64,
}

#[utoipa::path(
get,
operation_id = "actors_kv_list",
path = "/actors/{actor_id}/kv/keys",
params(
("actor_id" = Id, Path),
("prefix" = Option<String>, Query, description = "Base64 encoded key prefix to filter by"),
("limit" = Option<usize>, Query, description = "Number of results to return (default 100, max 1000)"),
("reverse" = Option<bool>, Query, description = "Whether to reverse the order"),
),
responses(
(status = 200, body = KvListResponse),
),
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn kv_list(
Extension(ctx): Extension<ApiCtx>,
Path(path): Path<KvListPath>,
Query(query): Query<KvListQuery>,
) -> Response {
match kv_list_inner(ctx, path, query).await {
Ok(response) => response,
Err(err) => ApiError::from(err).into_response(),
}
}

#[tracing::instrument(skip_all)]
async fn kv_list_inner(ctx: ApiCtx, path: KvListPath, query: KvListQuery) -> Result<Response> {
use axum::Json;

ctx.auth().await?;

if path.actor_id.label() == ctx.config().dc_label() {
let peer_path = rivet_api_peer::actors::kv_list::KvListPath {
actor_id: path.actor_id,
};
let peer_query = rivet_api_peer::actors::kv_list::KvListQuery {
prefix: query.prefix,
limit: query.limit,
reverse: query.reverse,
};
let res = rivet_api_peer::actors::kv_list::kv_list(ctx.into(), peer_path, peer_query).await?;

Ok(Json(res).into_response())
} else {
let mut url = format!("/actors/{}/kv/keys", path.actor_id);
let mut query_params = vec![];

if let Some(prefix) = query.prefix {
query_params.push(format!("prefix={}", urlencoding::encode(&prefix)));
}
if let Some(limit) = query.limit {
query_params.push(format!("limit={}", limit));
}
if let Some(reverse) = query.reverse {
query_params.push(format!("reverse={}", reverse));
}

if !query_params.is_empty() {
url.push_str("?");
url.push_str(&query_params.join("&"));
}

request_remote_datacenter_raw(
&ctx,
path.actor_id.label(),
&url,
axum::http::Method::GET,
Option::<&()>::None,
Option::<&()>::None,
)
.await
}
}
1 change: 1 addition & 0 deletions engine/packages/api-public/src/actors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod create;
pub mod delete;
pub mod get_or_create;
pub mod kv_get;
pub mod kv_list;
pub mod list;
pub mod list_names;
pub mod utils;
5 changes: 5 additions & 0 deletions engine/packages/api-public/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{actors, ctx, datacenters, health, metadata, namespaces, runner_confi
actors::list_names::list_names,
actors::get_or_create::get_or_create,
actors::kv_get::kv_get,
actors::kv_list::kv_list,
runners::list,
runners::list_names,
namespaces::list,
Expand Down Expand Up @@ -94,6 +95,10 @@ pub async fn router(
"/actors/{actor_id}/kv/keys/{key}",
axum::routing::get(actors::kv_get::kv_get),
)
.route(
"/actors/{actor_id}/kv/keys",
axum::routing::get(actors::kv_list::kv_list),
)
// MARK: Runners
.route("/runners", axum::routing::get(runners::list))
.route("/runners/names", axum::routing::get(runners::list_names))
Expand Down
Loading