diff --git a/backend/internal/compilerServer/daemon.go b/backend/internal/compilerServer/daemon.go index 5696437a..3e927db9 100644 --- a/backend/internal/compilerServer/daemon.go +++ b/backend/internal/compilerServer/daemon.go @@ -30,16 +30,7 @@ func (app *CompiledApp) IsAlive() bool { return false } - if !process.IsAlive() { - return false - } - - // Send a ping to the process - resp, err := http.Get(fmt.Sprintf("http://localhost:%d/api/health", process.Port)) - if resp != nil { - resp.Body.Close() - } - return err == nil && resp.StatusCode == http.StatusOK + return process.CheckHealth() } func (app *CompiledApp) keepAlive() { @@ -264,7 +255,7 @@ func (app *CompiledApp) StartServer() error { } // Send a ping to the process - if serverProcess.IsHealthy() { + if serverProcess.CheckHealth() { if atomic.CompareAndSwapInt64(app.keepAliveRunning, 0, 1) { go app.keepAlive() } diff --git a/backend/internal/identity/id.go b/backend/internal/identity/id.go index a964a264..cb8d2022 100644 --- a/backend/internal/identity/id.go +++ b/backend/internal/identity/id.go @@ -35,6 +35,10 @@ func (id Id) String() string { // Cleans inputs and then creates a category from them. If you have a valid category already, // ust path.Join to combine it with another category. +// +// Example: +// identity.Category("app", "appId") -> "/app/appId" +// identity.Category("my", "app", "badString/../") -> "/my/app/badString%2F%2E%2E%2F func Category(ids ...string) string { parts := make([]string, 0, len(ids)) for _, id := range ids { diff --git a/backend/internal/process/handle.go b/backend/internal/process/handle.go index f0991abb..46d631bc 100644 --- a/backend/internal/process/handle.go +++ b/backend/internal/process/handle.go @@ -49,6 +49,13 @@ func (m *ProcessManager) IsAlive(id ProcessId) bool { return r.IsAlive(id) } +func (m *ProcessManager) CheckHealth(id ProcessId) bool { + r := m.ReadHandle() + defer r.Close() + + return r.CheckHealth(id) +} + func (m *ProcessManager) CopyOutData() []Process { r := m.ReadHandle() defer r.Close() diff --git a/backend/internal/process/process.go b/backend/internal/process/process.go index a5db6817..13fa25bd 100644 --- a/backend/internal/process/process.go +++ b/backend/internal/process/process.go @@ -264,8 +264,16 @@ func (r *RHandle) IsAlive(id ProcessId) bool { return process.IsAlive() } -func (proc *Process) IsHealthy() bool { - return proc.IsAlive() && proc.HealthCheck.Check(health.RunningProcessInfo{Pid: proc.Pid, Port: proc.Port}) +func (r *RHandle) CheckHealth(id ProcessId) bool { + process, found := r.FindById(id) + if !found { + return false + } + return process.CheckHealth() +} + +func (proc *Process) CheckHealth() bool { + return proc.HealthCheck.Check(health.RunningProcessInfo{Pid: proc.Pid, Port: proc.Port}) } // This reads the path variable to find the right executable. diff --git a/backend/internal/server/process_rpc.go b/backend/internal/server/process_rpc.go index 16aee83f..c02cf78d 100644 --- a/backend/internal/server/process_rpc.go +++ b/backend/internal/server/process_rpc.go @@ -1,10 +1,89 @@ package server import ( + "net/http" + + "robinplatform.dev/internal/identity" "robinplatform.dev/internal/process" "robinplatform.dev/internal/pubsub" ) +type StartProcessForAppInput struct { + AppId string `json:"appId"` + ProcessKey string `json:"processKey"` + Command string `json:"command"` + Args []string `json:"args"` +} + +var StartProcessForApp = AppsRpcMethod[StartProcessForAppInput, map[string]any]{ + Name: "StartProcess", + Run: func(req RpcRequest[StartProcessForAppInput]) (map[string]any, *HttpError) { + id := process.ProcessId{ + Category: identity.Category("app", req.Data.AppId), + Key: req.Data.ProcessKey, + } + + processConfig := process.ProcessConfig{ + Command: req.Data.Command, + Args: req.Data.Args, + Id: id, + } + + proc, err := process.Manager.Spawn(processConfig) + if err != nil { + return nil, Errorf(http.StatusInternalServerError, "Failed to spawn new process %s: %s", req.Data.AppId, err) + } + + return map[string]any{ + "processKey": proc.Id, + "pid": proc.Pid, + }, nil + }, +} + +type StopProcessForAppInput struct { + AppId string `json:"appId"` + ProcessKey string `json:"processKey"` +} + +var StopProcessForApp = AppsRpcMethod[StartProcessForAppInput, map[string]any]{ + Name: "StopProcess", + Run: func(req RpcRequest[StartProcessForAppInput]) (map[string]any, *HttpError) { + id := process.ProcessId{ + Category: identity.Category("app", req.Data.AppId), + Key: req.Data.ProcessKey, + } + + if err := process.Manager.Remove(id); err != nil { + return nil, Errorf(http.StatusInternalServerError, "Failed to kill process %s: %s", req.Data.AppId, err) + } + + return map[string]any{}, nil + }, +} + +type CheckProcessHealthInput struct { + AppId string `json:"appId"` + ProcessKey string `json:"processKey"` +} + +var CheckProcessHealth = AppsRpcMethod[CheckProcessHealthInput, map[string]any]{ + Name: "CheckProcessHealth", + Run: func(req RpcRequest[CheckProcessHealthInput]) (map[string]any, *HttpError) { + id := process.ProcessId{ + Category: identity.Category("app", req.Data.AppId), + Key: req.Data.ProcessKey, + } + + isHealthy := process.Manager.CheckHealth(id) + + return map[string]any{ + "processKey": id, + "isHealthy": isHealthy, + }, nil + }, +} + func PipeTopic[T any](topicId pubsub.TopicId, req *StreamRequest[T, any]) error { sub, err := pubsub.SubscribeAny(&pubsub.Topics, topicId) if err != nil { diff --git a/backend/internal/server/server.go b/backend/internal/server/server.go index 0600c499..ae7ceca9 100644 --- a/backend/internal/server/server.go +++ b/backend/internal/server/server.go @@ -78,6 +78,10 @@ func (server *Server) loadRpcMethods() { CreateTopic.Register(server) PublishTopic.Register(server) + StartProcessForApp.Register(server) + StopProcessForApp.Register(server) + CheckProcessHealth.Register(server) + // Streaming methods wsHandler := &RpcWebsocket{} diff --git a/frontend/components/ProcessDebugger.tsx b/frontend/components/ProcessDebugger.tsx new file mode 100644 index 00000000..0ae30c9d --- /dev/null +++ b/frontend/components/ProcessDebugger.tsx @@ -0,0 +1,104 @@ +import { z } from 'zod'; +import React from 'react'; +import { runRpcQuery, useRpcQuery } from '../hooks/useRpcQuery'; +import { useTopic } from '../../toolkit/react/stream'; +import { ScrollWindow } from './ScrollWindow'; +import toast from 'react-hot-toast'; + +type ProcessInfo = z.infer; +const ProcessInfo = z.object({}); + +type Process = z.infer; +const Process = z.object({ + id: z.object({ + category: z.string(), + key: z.string(), + }), + command: z.string(), + args: z.array(z.string()), +}); + +// This is a temporary bit of code to just display what's in the processes DB +// to make writing other features easier +export function ProcessDebugger() { + const { data: processes = [], error } = useRpcQuery({ + method: 'ListProcesses', + data: {}, + result: z.array(Process), + }); + + const [currentProcess, setCurrentProcess] = React.useState(); + const { state } = useTopic({ + topicId: currentProcess && { + category: `/logs${currentProcess.id.category}`, + key: currentProcess.id.key, + }, + resultType: z.string(), + fetchState: () => + runRpcQuery({ + method: 'GetProcessLogs', + data: { processId: currentProcess?.id }, + result: z.object({ + counter: z.number(), + text: z.string(), + }), + }).then(({ counter, text }) => ({ counter, state: text })), + reducer: (prev, message) => { + return prev + '\n' + message; + }, + }); + + React.useEffect(() => { + if (error) { + toast.error(`${String(error)}`); + } + }, [error]); + + return ( +
+
Processes
+ + + {processes?.map((value) => { + const key = `${value.id.category} ${value.id.key}`; + return ( +
+ {key} + + + +
+								{JSON.stringify(value, null, 2)}
+							
+
+ ); + })} +
+ + +
+					{state}
+				
+
+
+ ); +} diff --git a/frontend/pages/index.tsx b/frontend/pages/index.tsx index c8c2e440..8cd5cb72 100644 --- a/frontend/pages/index.tsx +++ b/frontend/pages/index.tsx @@ -1,105 +1,10 @@ import Head from 'next/head'; import React from 'react'; import { z } from 'zod'; -import { runRpcQuery, useRpcQuery } from '../hooks/useRpcQuery'; -import toast from 'react-hot-toast'; -import { useTopicQuery } from '../../toolkit/react/stream'; +import { runRpcQuery } from '../hooks/useRpcQuery'; +import { useTopic } from '../../toolkit/react/stream'; import { ScrollWindow } from '../components/ScrollWindow'; - -type Process = z.infer; -const Process = z.object({ - id: z.object({ - category: z.string(), - key: z.string(), - }), - command: z.string(), - args: z.array(z.string()), -}); - -// This is a temporary bit of code to just display what's in the processes DB -// to make writing other features easier -function Processes() { - const { data: processes = [], error } = useRpcQuery({ - method: 'ListProcesses', - data: {}, - result: z.array(Process), - }); - - const [currentProcess, setCurrentProcess] = React.useState(); - const { state } = useTopicQuery({ - topicId: currentProcess && { - category: `/logs${currentProcess.id.category}`, - key: currentProcess.id.key, - }, - resultType: z.string(), - fetchState: () => - runRpcQuery({ - method: 'GetProcessLogs', - data: { processId: currentProcess?.id }, - result: z.object({ - counter: z.number(), - text: z.string(), - }), - }).then(({ counter, text }) => ({ counter, state: text })), - reducer: (prev, message) => { - return prev + '\n' + message; - }, - }); - - React.useEffect(() => { - if (error) { - toast.error(`${String(error)}`); - } - }, [error]); - - return ( -
-
Processes
- - - {processes?.map((value) => { - const key = `${value.id.category} ${value.id.key}`; - return ( -
- {key} - - - -
-								{JSON.stringify(value, null, 2)}
-							
-
- ); - })} -
- - -
-					{state}
-				
-
-
- ); -} +import { ProcessDebugger } from '../components/ProcessDebugger'; type TopicId = z.infer; const TopicId = z.object({ @@ -131,7 +36,7 @@ function Topics() { TopicInfo & { key: string } >(); - const { state: topics } = useTopicQuery({ + const { state: topics } = useTopic({ resultType: MetaTopicInfo, topicId: { category: '/topics', @@ -166,10 +71,7 @@ function Topics() { }, }); - const { state: topicMessages } = useTopicQuery< - Record, - unknown - >({ + const { state: topicMessages } = useTopic, unknown>({ resultType: z.unknown(), skip: !selectedTopic?.id, topicId: selectedTopic?.id, @@ -267,7 +169,7 @@ export default function Home() {
- +
diff --git a/toolkit/react/stream.tsx b/toolkit/react/stream.tsx index 1e55e70f..f51eb0ae 100644 --- a/toolkit/react/stream.tsx +++ b/toolkit/react/stream.tsx @@ -15,7 +15,6 @@ const PubsubData = z.object({ // Subscribe to an app topic and track the messages received in relation // to state. export function useAppTopicQuery({ - appId = process.env.ROBIN_APP_ID, category, key, fetchState, @@ -23,7 +22,6 @@ export function useAppTopicQuery({ resultType, skip, }: { - appId?: string; resultType: z.Schema; category?: string[]; key?: string; @@ -31,7 +29,8 @@ export function useAppTopicQuery({ reducer: (s: State, o: Output) => State; skip?: boolean; }) { - return useTopicQueryInternal({ + const appId = process.env.ROBIN_APP_ID; + return useIndexedStream({ methodName: 'SubscribeAppTopic', data: { appId, @@ -47,7 +46,7 @@ export function useAppTopicQuery({ // Subscribe to a topic and track the messages received in relation // to state. -export function useTopicQuery({ +export function useTopic({ topicId, fetchState, reducer, @@ -60,7 +59,7 @@ export function useTopicQuery({ reducer: (s: State, o: Output) => State; skip?: boolean; }) { - return useTopicQueryInternal({ + return useIndexedStream({ methodName: 'SubscribeTopic', data: { id: topicId }, skip: skip || !topicId, @@ -70,9 +69,9 @@ export function useTopicQuery({ }); } -// Subscribe to a topic and track the messages received in relation -// to state. -function useTopicQueryInternal({ +// Read data from a stream and track the stream data in lock-step with +// state fetched from an external source. +export function useIndexedStream({ methodName, data, fetchState,