From cd405510518f7b2db4f9ea03311e1059181d6c5d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 11 Dec 2025 17:01:26 +0000 Subject: [PATCH 1/4] Initial plan From 53e8356819e48bb8fb58a82b219e6013159f9212 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 11 Dec 2025 17:19:48 +0000 Subject: [PATCH 2/4] Add KV list backend endpoints to api-public and api-peer Co-authored-by: jog1t <39823706+jog1t@users.noreply.github.com> --- Cargo.lock | 1 + engine/packages/api-peer/Cargo.toml | 1 + .../packages/api-peer/src/actors/kv_list.rs | 93 ++++++++++++++ engine/packages/api-peer/src/actors/mod.rs | 1 + .../packages/api-public/src/actors/kv_list.rs | 120 ++++++++++++++++++ engine/packages/api-public/src/actors/mod.rs | 1 + engine/packages/api-public/src/router.rs | 5 + 7 files changed, 222 insertions(+) create mode 100644 engine/packages/api-peer/src/actors/kv_list.rs create mode 100644 engine/packages/api-public/src/actors/kv_list.rs diff --git a/Cargo.lock b/Cargo.lock index f4262236de..aa254193cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4259,6 +4259,7 @@ dependencies = [ "rivet-config", "rivet-error", "rivet-pools", + "rivet-runner-protocol", "rivet-types", "rivet-util", "serde", diff --git a/engine/packages/api-peer/Cargo.toml b/engine/packages/api-peer/Cargo.toml index 0f4171d64e..d596007cc9 100644 --- a/engine/packages/api-peer/Cargo.toml +++ b/engine/packages/api-peer/Cargo.toml @@ -18,6 +18,7 @@ rivet-api-util.workspace = true rivet-config.workspace = true rivet-error.workspace = true rivet-pools.workspace = true +rivet-runner-protocol.workspace = true rivet-util.workspace = true rivet-types.workspace = true serde.workspace = true diff --git a/engine/packages/api-peer/src/actors/kv_list.rs b/engine/packages/api-peer/src/actors/kv_list.rs new file mode 100644 index 0000000000..cfcec92138 --- /dev/null +++ b/engine/packages/api-peer/src/actors/kv_list.rs @@ -0,0 +1,93 @@ +use anyhow::*; +use base64::Engine; +use base64::prelude::BASE64_STANDARD; +use pegboard_actor_kv as actor_kv; +use rivet_api_builder::ApiCtx; +use rivet_runner_protocol::mk2 as rp; +use rivet_util::Id; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +#[derive(Deserialize)] +#[serde(deny_unknown_fields)] +pub struct KvListPath { + pub actor_id: Id, +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct KvListQuery { + /// Base64 encoded key prefix to filter by + pub prefix: Option, + /// Number of results to return (default 100, max 1000) + pub limit: Option, + /// Whether to reverse the order + pub reverse: Option, +} + +#[derive(Serialize, ToSchema)] +#[schema(as = ActorsKvListResponse)] +pub struct KvListResponse { + pub entries: Vec, +} + +#[derive(Serialize, ToSchema)] +pub struct KvEntry { + /// Key encoded in base64 + pub key: String, + /// Value encoded in base64 + pub value: String, + pub update_ts: i64, +} + +#[utoipa::path( + get, + operation_id = "actors_kv_list", + path = "/actors/{actor_id}/kv/keys", + params( + ("actor_id" = Id, Path), + ("prefix" = Option, Query, description = "Base64 encoded key prefix to filter by"), + ("limit" = Option, Query, description = "Number of results to return (default 100, max 1000)"), + ("reverse" = Option, Query, description = "Whether to reverse the order"), + ), + responses( + (status = 200, body = KvListResponse), + ), +)] +#[tracing::instrument(skip_all)] +pub async fn kv_list(ctx: ApiCtx, path: KvListPath, query: KvListQuery) -> Result { + // Parse query parameters + let limit = query.limit.unwrap_or(100).min(1000); + let reverse = query.reverse.unwrap_or(false); + + // Build list query + let list_query = if let Some(prefix) = query.prefix { + let prefix_bytes = BASE64_STANDARD + .decode(&prefix) + .context("failed to decode base64 prefix")?; + rp::KvListQuery::KvListPrefixQuery(rp::KvListPrefixQuery { + key: prefix_bytes, + }) + } else { + rp::KvListQuery::KvListAllQuery + }; + + // Get the KV entries + let udb = ctx.pools().udb()?; + let (keys, values, metadata) = + actor_kv::list(&*udb, path.actor_id, list_query, reverse, Some(limit)).await?; + + // Build response + let entries = keys + .into_iter() + .zip(values.into_iter()) + .zip(metadata.into_iter()) + .map(|((key, value), meta)| KvEntry { + key: BASE64_STANDARD.encode(&key), + value: BASE64_STANDARD.encode(&value), + update_ts: meta.update_ts, + }) + .collect(); + + Ok(KvListResponse { entries }) +} diff --git a/engine/packages/api-peer/src/actors/mod.rs b/engine/packages/api-peer/src/actors/mod.rs index 9451cd59eb..3f3609f351 100644 --- a/engine/packages/api-peer/src/actors/mod.rs +++ b/engine/packages/api-peer/src/actors/mod.rs @@ -1,5 +1,6 @@ pub mod create; pub mod delete; pub mod kv_get; +pub mod kv_list; pub mod list; pub mod list_names; diff --git a/engine/packages/api-public/src/actors/kv_list.rs b/engine/packages/api-public/src/actors/kv_list.rs new file mode 100644 index 0000000000..68228cf492 --- /dev/null +++ b/engine/packages/api-public/src/actors/kv_list.rs @@ -0,0 +1,120 @@ +use anyhow::Result; +use axum::response::{IntoResponse, Response}; +use rivet_api_builder::{ + ApiError, + extract::{Extension, Path, Query}, +}; +use rivet_api_util::request_remote_datacenter_raw; +use rivet_util::Id; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +use crate::ctx::ApiCtx; + +#[derive(Deserialize)] +#[serde(deny_unknown_fields)] +pub struct KvListPath { + pub actor_id: Id, +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct KvListQuery { + /// Base64 encoded key prefix to filter by + pub prefix: Option, + /// Number of results to return (default 100, max 1000) + pub limit: Option, + /// Whether to reverse the order + pub reverse: Option, +} + +#[derive(Serialize, ToSchema)] +#[schema(as = ActorsKvListResponse)] +pub struct KvListResponse { + pub entries: Vec, +} + +#[derive(Serialize, ToSchema)] +pub struct KvEntry { + /// Key encoded in base64 + pub key: String, + /// Value encoded in base64 + pub value: String, + pub update_ts: i64, +} + +#[utoipa::path( + get, + operation_id = "actors_kv_list", + path = "/actors/{actor_id}/kv/keys", + params( + ("actor_id" = Id, Path), + ("prefix" = Option, Query, description = "Base64 encoded key prefix to filter by"), + ("limit" = Option, Query, description = "Number of results to return (default 100, max 1000)"), + ("reverse" = Option, Query, description = "Whether to reverse the order"), + ), + responses( + (status = 200, body = KvListResponse), + ), + security(("bearer_auth" = [])), +)] +#[tracing::instrument(skip_all)] +pub async fn kv_list( + Extension(ctx): Extension, + Path(path): Path, + Query(query): Query, +) -> Response { + match kv_list_inner(ctx, path, query).await { + Ok(response) => response, + Err(err) => ApiError::from(err).into_response(), + } +} + +#[tracing::instrument(skip_all)] +async fn kv_list_inner(ctx: ApiCtx, path: KvListPath, query: KvListQuery) -> Result { + use axum::Json; + + ctx.auth().await?; + + if path.actor_id.label() == ctx.config().dc_label() { + let peer_path = rivet_api_peer::actors::kv_list::KvListPath { + actor_id: path.actor_id, + }; + let peer_query = rivet_api_peer::actors::kv_list::KvListQuery { + prefix: query.prefix, + limit: query.limit, + reverse: query.reverse, + }; + let res = rivet_api_peer::actors::kv_list::kv_list(ctx.into(), peer_path, peer_query).await?; + + Ok(Json(res).into_response()) + } else { + let mut url = format!("/actors/{}/kv/keys", path.actor_id); + let mut query_params = vec![]; + + if let Some(prefix) = query.prefix { + query_params.push(format!("prefix={}", urlencoding::encode(&prefix))); + } + if let Some(limit) = query.limit { + query_params.push(format!("limit={}", limit)); + } + if let Some(reverse) = query.reverse { + query_params.push(format!("reverse={}", reverse)); + } + + if !query_params.is_empty() { + url.push_str("?"); + url.push_str(&query_params.join("&")); + } + + request_remote_datacenter_raw( + &ctx, + path.actor_id.label(), + &url, + axum::http::Method::GET, + Option::<&()>::None, + Option::<&()>::None, + ) + .await + } +} diff --git a/engine/packages/api-public/src/actors/mod.rs b/engine/packages/api-public/src/actors/mod.rs index d1adaf1d36..1cd4d8c833 100644 --- a/engine/packages/api-public/src/actors/mod.rs +++ b/engine/packages/api-public/src/actors/mod.rs @@ -2,6 +2,7 @@ pub mod create; pub mod delete; pub mod get_or_create; pub mod kv_get; +pub mod kv_list; pub mod list; pub mod list_names; pub mod utils; diff --git a/engine/packages/api-public/src/router.rs b/engine/packages/api-public/src/router.rs index 39a13707c2..d760ee8f8f 100644 --- a/engine/packages/api-public/src/router.rs +++ b/engine/packages/api-public/src/router.rs @@ -19,6 +19,7 @@ use crate::{actors, ctx, datacenters, health, metadata, namespaces, runner_confi actors::list_names::list_names, actors::get_or_create::get_or_create, actors::kv_get::kv_get, + actors::kv_list::kv_list, runners::list, runners::list_names, namespaces::list, @@ -94,6 +95,10 @@ pub async fn router( "/actors/{actor_id}/kv/keys/{key}", axum::routing::get(actors::kv_get::kv_get), ) + .route( + "/actors/{actor_id}/kv/keys", + axum::routing::get(actors::kv_list::kv_list), + ) // MARK: Runners .route("/runners", axum::routing::get(runners::list)) .route("/runners/names", axum::routing::get(runners::list_names)) From e99c7fcbf86318cdcd83dd6795abbbb18b61d2f0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 11 Dec 2025 17:21:01 +0000 Subject: [PATCH 3/4] Add KV list route to inspector (partial implementation) Co-authored-by: jog1t <39823706+jog1t@users.noreply.github.com> --- .../packages/rivetkit/src/inspector/actor.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/rivetkit-typescript/packages/rivetkit/src/inspector/actor.ts b/rivetkit-typescript/packages/rivetkit/src/inspector/actor.ts index fc080bcc74..361ab9a62b 100644 --- a/rivetkit-typescript/packages/rivetkit/src/inspector/actor.ts +++ b/rivetkit-typescript/packages/rivetkit/src/inspector/actor.ts @@ -301,6 +301,17 @@ interface ActorInspectorAccessors { getRpcs: () => Promise; getConnections: () => Promise; executeAction: (name: string, params?: unknown[]) => Promise; + getKvEntries: (options: { + prefix?: string; + limit?: number; + reverse?: boolean; + }) => Promise< + Array<{ + key: string; + value: string; + updateTs: number; + }> + >; } interface ActorInspectorEmitterEvents { From dc6fcc5f988a889ad946859a98baa489dd64c1ec Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 11 Dec 2025 18:34:05 +0000 Subject: [PATCH 4/4] Add KV inspector UI with search functionality Co-authored-by: jog1t <39823706+jog1t@users.noreply.github.com> --- .../src/components/actors/actor-kv-list.tsx | 136 ++++++++++++++++++ .../src/components/actors/actor-kv-tab.tsx | 44 ++++++ frontend/src/components/actors/actor-kv.tsx | 129 +++++++++++++++++ .../actors/actor-queries-context.tsx | 33 +++++ .../actors/actors-actor-details.tsx | 21 +++ .../rivetkit/src/inspector/protocol/common.ts | 1 + 6 files changed, 364 insertions(+) create mode 100644 frontend/src/components/actors/actor-kv-list.tsx create mode 100644 frontend/src/components/actors/actor-kv-tab.tsx create mode 100644 frontend/src/components/actors/actor-kv.tsx diff --git a/frontend/src/components/actors/actor-kv-list.tsx b/frontend/src/components/actors/actor-kv-list.tsx new file mode 100644 index 0000000000..384d332bc8 --- /dev/null +++ b/frontend/src/components/actors/actor-kv-list.tsx @@ -0,0 +1,136 @@ +import { useQuery } from "@tanstack/react-query"; +import { format } from "date-fns"; +import { type PropsWithChildren, useEffect, useRef } from "react"; +import { useActor } from "./actor-queries-context"; +import { ActorObjectInspector } from "./console/actor-inspector"; +import type { ActorId } from "./queries"; + +interface ActorKvListProps { + actorId: ActorId; + search: string; +} + +export function ActorKvList({ + actorId, + search, +}: ActorKvListProps) { + const actorQueries = useActor(); + const { data, isLoading, isError } = useQuery( + actorQueries.actorKvQueryOptions(actorId), + ); + + if (isLoading) { + return Loading KV entries...; + } + + if (isError) { + return ( + + KV Inspector is currently unavailable. +
+ See console/logs for more details. +
+ ); + } + + const filteredEntries = data?.entries.filter?.((entry) => { + if (!search) return true; + + try { + // Decode base64 key to search in it + const decodedKey = atob(entry.key); + return decodedKey.toLowerCase().includes(search.toLowerCase()); + } catch { + // If decode fails, search in the base64 string itself + return entry.key.toLowerCase().includes(search.toLowerCase()); + } + }); + + if (filteredEntries?.length === 0) { + return No KV entries found.; + } + + return filteredEntries?.map((entry, index) => { + return ; + }); +} + +interface KvEntryProps { + key: string; + value: string; + updateTs: number; +} + +function KvEntry(props: KvEntryProps) { + const ref = useRef(null); + + useEffect(() => { + ref.current?.scrollIntoView({ + behavior: "smooth", + block: "nearest", + }); + }, [props]); + + let decodedKey = props.key; + let decodedValue = props.value; + let parsedValue: unknown = null; + let valueSize = "0 B"; + + try { + decodedKey = atob(props.key); + } catch { + // Keep original if decode fails + } + + try { + const valueBytes = atob(props.value); + decodedValue = valueBytes; + valueSize = formatBytes(valueBytes.length); + + // Try to parse as JSON for better display + try { + parsedValue = JSON.parse(valueBytes); + } catch { + // Not JSON, keep as string + parsedValue = valueBytes; + } + } catch { + // Keep original if decode fails + } + + return ( +
+
+ {format(new Date(props.updateTs), "HH:mm:ss.SSS")} +
+
+ {decodedKey} +
+
+ +
+
+ {valueSize} +
+
+ ); +} + +function Info({ children }: PropsWithChildren) { + return ( +
+ {children} +
+ ); +} + +function formatBytes(bytes: number): string { + if (bytes === 0) return "0 B"; + const k = 1024; + const sizes = ["B", "KB", "MB", "GB"]; + const i = Math.floor(Math.log(bytes) / Math.log(k)); + return `${Number.parseFloat((bytes / k ** i).toFixed(2))} ${sizes[i]}`; +} diff --git a/frontend/src/components/actors/actor-kv-tab.tsx b/frontend/src/components/actors/actor-kv-tab.tsx new file mode 100644 index 0000000000..3374aff62c --- /dev/null +++ b/frontend/src/components/actors/actor-kv-tab.tsx @@ -0,0 +1,44 @@ +import { useQuery } from "@tanstack/react-query"; +import { ActorKv } from "./actor-kv"; +import { useActor } from "./actor-queries-context"; +import { Info } from "./actor-state-tab"; +import { useDataProvider } from "./data-provider"; +import type { ActorId } from "./queries"; + +interface ActorKvTabProps { + actorId: ActorId; +} + +export function ActorKvTab({ actorId }: ActorKvTabProps) { + const { data: destroyedAt } = useQuery( + useDataProvider().actorDestroyedAtQueryOptions(actorId), + ); + + const { isError, isLoading } = useQuery( + useActor().actorKvQueryOptions(actorId), + ); + + if (destroyedAt) { + return ( +
+ KV Inspector is unavailable for inactive Actors. +
+ ); + } + + if (isError) { + return ( + + KV Inspector is currently unavailable. +
+ See console/logs for more details. +
+ ); + } + + if (isLoading) { + return Loading...; + } + + return ; +} diff --git a/frontend/src/components/actors/actor-kv.tsx b/frontend/src/components/actors/actor-kv.tsx new file mode 100644 index 0000000000..04647aef28 --- /dev/null +++ b/frontend/src/components/actors/actor-kv.tsx @@ -0,0 +1,129 @@ +import { useQuery } from "@tanstack/react-query"; +import { + startTransition, + useCallback, + useEffect, + useRef, + useState, +} from "react"; +import { useResizeObserver } from "usehooks-ts"; +import { ScrollArea } from "@/components"; +import { useActorDetailsSettings } from "./actor-details-settings"; +import { ActorKvList } from "./actor-kv-list"; +import { useActor } from "./actor-queries-context"; +import type { ActorId } from "./queries"; + +interface ActorKvProps { + actorId: ActorId; +} + +export function ActorKv({ actorId }: ActorKvProps) { + const [search, setSearch] = useState(""); + const ref = useRef(null); + const [settings] = useActorDetailsSettings(); + + const actorQueries = useActor(); + const { data } = useQuery(actorQueries.actorKvQueryOptions(actorId)); + const { onScroll } = useScrollToBottom(ref, [data]); + + return ( +
+
+
+
+ + startTransition(() => setSearch(e.target.value)) + } + /> +
+
+
+
+ +
+
+
+ Updated At +
+
Key
+
Value
+
Size
+
+ + +
+
+
+
+ ); +} + +function useScrollToBottom( + ref: React.RefObject, + deps: unknown[], +) { + const [settings] = useActorDetailsSettings(); + const follow = useRef(true); + const shouldFollow = () => settings.autoFollowLogs && follow.current; + useResizeObserver({ + // @ts-expect-error -- TS2322 -- Type 'HTMLDivElement' is not assignable to type 'Element | null'. + ref, + onResize: () => { + if (shouldFollow()) { + // https://github.com/TanStack/virtual/issues/537 + requestAnimationFrame(() => { + ref.current?.scrollTo({ + top: ref.current.scrollHeight, + behavior: "smooth", + }); + }); + } + }, + }); + + const onScroll = useCallback((e: React.UIEvent) => { + follow.current = + e.currentTarget.scrollHeight - e.currentTarget.scrollTop <= + e.currentTarget.clientHeight; + }, []); + + useEffect( + () => { + if (!shouldFollow()) { + return () => {}; + } + // https://github.com/TanStack/virtual/issues/537 + const rafId = requestAnimationFrame(() => { + ref.current?.scrollTo({ + top: ref.current.scrollHeight, + behavior: "smooth", + }); + }); + + return () => { + cancelAnimationFrame(rafId); + }; + }, + // biome-ignore lint/correctness/useExhaustiveDependencies: deps is passed from caller + deps, + ); + + return { onScroll }; +} diff --git a/frontend/src/components/actors/actor-queries-context.tsx b/frontend/src/components/actors/actor-queries-context.tsx index c9ef62ab70..3d89d00d1a 100644 --- a/frontend/src/components/actors/actor-queries-context.tsx +++ b/frontend/src/components/actors/actor-queries-context.tsx @@ -220,6 +220,39 @@ export const createDefaultActorContext = ( }; }, + actorKvQueryOptions( + actorId: ActorId, + { enabled, prefix, limit, reverse }: { enabled?: boolean; prefix?: string; limit?: number; reverse?: boolean } = {}, + ) { + return queryOptions({ + enabled: enabled ?? true, + refetchInterval: 1000, + queryKey: [hash, "actor", actorId, "kv", { prefix, limit, reverse }], + queryFn: async ({ queryKey: [, , actorId] }) => { + const client = await this.createActorInspector(actorId); + const params = new URLSearchParams(); + if (prefix) params.append("prefix", prefix); + if (limit) params.append("limit", String(limit)); + if (reverse) params.append("reverse", String(reverse)); + + const response = await client.kv.$get({ + query: Object.fromEntries(params), + }); + + if (!response.ok) { + throw response; + } + return (await response.json()) as { + entries: Array<{ + key: string; + value: string; + updateTs: number; + }>; + }; + }, + }); + }, + actorWakeUpMutationOptions(actorId: ActorId) { return { mutationKey: [hash, "actor", actorId, "wake-up"], diff --git a/frontend/src/components/actors/actors-actor-details.tsx b/frontend/src/components/actors/actors-actor-details.tsx index 4b782e3dd5..1399daa987 100644 --- a/frontend/src/components/actors/actors-actor-details.tsx +++ b/frontend/src/components/actors/actors-actor-details.tsx @@ -14,6 +14,7 @@ import { ActorConnectionsTab } from "./actor-connections-tab"; import { ActorDatabaseTab } from "./actor-db-tab"; import { ActorDetailsSettingsProvider } from "./actor-details-settings"; import { ActorEventsTab } from "./actor-events-tab"; +import { ActorKvTab } from "./actor-kv-tab"; import { ActorLogsTab } from "./actor-logs-tab"; import { ActorMetricsTab } from "./actor-metrics-tab"; import { ActorStateTab } from "./actor-state-tab"; @@ -122,6 +123,7 @@ export function ActorTabs({ const supportsMetrics = features?.includes(ActorFeature.Metrics); const supportsEvents = features?.includes(ActorFeature.EventsMonitoring); const supportsDatabase = features?.includes(ActorFeature.Database); + const supportsKv = features?.includes(ActorFeature.Kv); const defaultTab = supportsState ? "state" : "logs"; const value = disabled ? undefined : tab || defaultTab; @@ -174,6 +176,15 @@ export function ActorTabs({ Database ) : null} + {supportsKv ? ( + + KV + + ) : null} {supportsLogs ? ( ) : null} + {supportsKv ? ( + + {guardContent || ( + + )} + + ) : null} {supportsState ? (