From 64521f25928958c0608c3f399d4678ea265f706e Mon Sep 17 00:00:00 2001 From: Ali Naqvi Date: Wed, 29 Oct 2025 12:03:08 +0800 Subject: [PATCH] feat: [PPT-2272] Add edge error + health reporting feature --- OPENAPI_DOC.yml | 1088 ++++++++++++++++++++-- spec/api/edge_monitoring_spec.cr | 49 + spec/api/status_spec.cr | 74 ++ spec/placeos-edge/client_spec.cr | 64 ++ src/api/edge_monitoring.cr | 201 ++++ src/api/status.cr | 108 +++ src/placeos-core/edge_error.cr | 213 +++++ src/placeos-core/module_manager.cr | 72 ++ src/placeos-core/process_manager/edge.cr | 276 +++++- src/placeos-edge/client.cr | 148 +++ src/placeos-edge/protocol.cr | 18 + src/placeos-edge/server.cr | 75 +- 12 files changed, 2316 insertions(+), 70 deletions(-) create mode 100644 spec/api/edge_monitoring_spec.cr create mode 100644 src/api/edge_monitoring.cr create mode 100644 src/placeos-core/edge_error.cr diff --git a/OPENAPI_DOC.yml b/OPENAPI_DOC.yml index d91d98a8..48264c90 100644 --- a/OPENAPI_DOC.yml +++ b/OPENAPI_DOC.yml @@ -3,8 +3,113 @@ openapi: 3.0.3 info: description: Internal core API. Handles driver management and comms title: core - version: 4.15.1 + version: 4.20.2 paths: + /api/core/v1/build/monitor: + get: + tags: + - BuildMonitor + operationId: PlaceOS::Core::Api::BuildMonitor_monitor + parameters: + - name: state + in: query + description: state of job to return. One of [pending,running,cancelled error,done]. + Defaults to 'pending' + example: pending + schema: + type: string + enum: + - pending + - running + - cancelled + - error + - done + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/_Array_PlaceOS__Core__DriverStore__TaskStatus____String_' + 404: + description: Not Found + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__CommonError' + 406: + description: Not Acceptable + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 415: + description: Unsupported Media Type + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 422: + description: Unprocessable Entity + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + 400: + description: Bad Request + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + /api/core/v1/build/cancel/{job}: + delete: + tags: + - BuildMonitor + operationId: PlaceOS::Core::Api::BuildMonitor_cancel + parameters: + - name: job + in: path + description: ID of previously submitted compilation job + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__DriverStore__CancelStatus' + 404: + description: Not Found + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__CommonError' + 406: + description: Not Acceptable + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 415: + description: Unsupported Media Type + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 422: + description: Unprocessable Entity + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + 400: + description: Bad Request + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' /api/core/v1/chaos/terminate: post: summary: Terminate a process by executable path @@ -23,7 +128,6 @@ paths: in: query description: optionally provide the edge id the driver is running on example: edge-12345 - required: false schema: type: string nullable: true @@ -134,7 +238,6 @@ paths: in: query description: the user context for the execution example: user-1234 - required: false schema: type: string nullable: true @@ -448,7 +551,6 @@ paths: in: query description: the branch of the repository example: main - required: false schema: type: string responses: @@ -531,16 +633,63 @@ paths: application/json: schema: $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' - /api/core/v1: + /api/core/v1/monitoring/edge/{edge_id}/errors/stream: get: - summary: route for checking the health of the service + summary: WebSocket endpoint for real-time error monitoring of a specific edge tags: - - Root - operationId: PlaceOS::Core::Api::Root_healthcheck + - EdgeMonitoring + operationId: PlaceOS::Core::Api::EdgeMonitoring_edge_error_stream + parameters: + - name: edge_id + in: path + description: the edge ID to monitor + example: edge-1234 + required: true + schema: + type: string + responses: + 101: + description: Switching Protocols + 404: + description: Not Found + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__CommonError' + 406: + description: Not Acceptable + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 415: + description: Unsupported Media Type + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 422: + description: Unprocessable Entity + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + 400: + description: Bad Request + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + /api/core/v1/monitoring/edges/errors/stream: + get: + summary: WebSocket endpoint for real-time error monitoring of all edges + tags: + - EdgeMonitoring + operationId: PlaceOS::Core::Api::EdgeMonitoring_all_edges_error_stream parameters: [] responses: - 200: - description: OK + 101: + description: Switching Protocols 404: description: Not Found content: @@ -571,22 +720,66 @@ paths: application/json: schema: $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' - 503: - description: Service Unavailable - /api/core/v1/version: + /api/core/v1/monitoring/edges/modules/stream: get: - summary: returns the build details of the service + summary: WebSocket endpoint for real-time module status monitoring tags: - - Root - operationId: PlaceOS::Core::Api::Root_version + - EdgeMonitoring + operationId: PlaceOS::Core::Api::EdgeMonitoring_edge_modules_stream parameters: [] + responses: + 101: + description: Switching Protocols + 404: + description: Not Found + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__CommonError' + 406: + description: Not Acceptable + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 415: + description: Unsupported Media Type + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 422: + description: Unprocessable Entity + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + 400: + description: Bad Request + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + /api/core/v1/monitoring/cleanup: + post: + summary: REST endpoint to trigger error cleanup + tags: + - EdgeMonitoring + operationId: PlaceOS::Core::Api::EdgeMonitoring_cleanup_errors + parameters: + - name: hours + in: query + description: 'Hours to keep errors (default: 24)' + schema: + type: integer + format: Int32 responses: 200: description: OK content: application/json: schema: - $ref: '#/components/schemas/PlaceOS__Model__Version' + $ref: '#/components/schemas/JSON__Any' 404: description: Not Found content: @@ -617,18 +810,20 @@ paths: application/json: schema: $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' - 503: - description: Service Unavailable - /api/core/v1/ready: + /api/core/v1/monitoring/summary: get: - summary: has the service finished loading + summary: REST endpoint to get error summary tags: - - Root - operationId: PlaceOS::Core::Api::Root_ready + - EdgeMonitoring + operationId: PlaceOS::Core::Api::EdgeMonitoring_error_summary parameters: [] responses: 200: description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/JSON__Any' 404: description: Not Found content: @@ -659,22 +854,16 @@ paths: application/json: schema: $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' - 503: - description: Service Unavailable - /api/core/v1/status: + /api/core/v1: get: - summary: General statistics related to the process + summary: route for checking the health of the service tags: - - Status - operationId: PlaceOS::Core::Api::Status_statistics + - Root + operationId: PlaceOS::Core::Api::Root_healthcheck parameters: [] responses: 200: description: OK - content: - application/json: - schema: - $ref: '#/components/schemas/PlaceOS__Core__Api__Status__Statistics' 404: description: Not Found content: @@ -705,27 +894,22 @@ paths: application/json: schema: $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' - /api/core/v1/status/driver: + 503: + description: Service Unavailable + /api/core/v1/version: get: - summary: details related to a process (+ anything else we can think of) + summary: returns the build details of the service tags: - - Status - operationId: PlaceOS::Core::Api::Status_driver - parameters: - - name: path - in: query - description: the path of the compiled driver - example: /path/to/compiled_driver - required: true - schema: - type: string + - Root + operationId: PlaceOS::Core::Api::Root_version + parameters: [] responses: 200: description: OK content: application/json: schema: - $ref: '#/components/schemas/PlaceOS__Core__Api__Status__DriverStatus' + $ref: '#/components/schemas/PlaceOS__Model__Version' 404: description: Not Found content: @@ -756,20 +940,18 @@ paths: application/json: schema: $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' - /api/core/v1/status/load: + 503: + description: Service Unavailable + /api/core/v1/ready: get: - summary: details about the overall machine load + summary: has the service finished loading tags: - - Status - operationId: PlaceOS::Core::Api::Status_load + - Root + operationId: PlaceOS::Core::Api::Root_ready parameters: [] responses: 200: description: OK - content: - application/json: - schema: - $ref: '#/components/schemas/PlaceOS__Core__Api__Status__MachineLoad' 404: description: Not Found content: @@ -800,13 +982,14 @@ paths: application/json: schema: $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' - /api/core/v1/status/loaded: + 503: + description: Service Unavailable + /api/core/v1/status: get: - summary: Returns the lists of modules drivers have loaded for this core, and - managed edges + summary: General statistics related to the process tags: - Status - operationId: PlaceOS::Core::Api::Status_loaded + operationId: PlaceOS::Core::Api::Status_statistics parameters: [] responses: 200: @@ -814,7 +997,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/PlaceOS__Core__Api__Status__LoadedModules' + $ref: '#/components/schemas/PlaceOS__Core__Api__Status__Statistics' 404: description: Not Found content: @@ -845,13 +1028,548 @@ paths: application/json: schema: $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' -components: - schemas: - Bool: - type: boolean - String: - type: string - PlaceOS__Model__Version: + /api/core/v1/status/driver: + get: + summary: details related to a process (+ anything else we can think of) + tags: + - Status + operationId: PlaceOS::Core::Api::Status_driver + parameters: + - name: path + in: query + description: the path of the compiled driver + example: /path/to/compiled_driver + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Status__DriverStatus' + 404: + description: Not Found + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__CommonError' + 406: + description: Not Acceptable + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 415: + description: Unsupported Media Type + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 422: + description: Unprocessable Entity + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + 400: + description: Bad Request + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + /api/core/v1/status/load: + get: + summary: details about the overall machine load + tags: + - Status + operationId: PlaceOS::Core::Api::Status_load + parameters: [] + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Status__MachineLoad' + 404: + description: Not Found + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__CommonError' + 406: + description: Not Acceptable + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 415: + description: Unsupported Media Type + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 422: + description: Unprocessable Entity + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + 400: + description: Bad Request + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + /api/core/v1/status/loaded: + get: + summary: Returns the lists of modules drivers have loaded for this core, and + managed edges + tags: + - Status + operationId: PlaceOS::Core::Api::Status_loaded + parameters: [] + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Status__LoadedModules' + 404: + description: Not Found + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__CommonError' + 406: + description: Not Acceptable + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 415: + description: Unsupported Media Type + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 422: + description: Unprocessable Entity + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + 400: + description: Bad Request + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + /api/core/v1/status/edge/{edge_id}/errors: + get: + summary: Get errors for a specific edge + tags: + - Status + operationId: PlaceOS::Core::Api::Status_edge_errors + parameters: + - name: edge_id + in: path + required: true + schema: + type: string + - name: limit + in: query + description: Number of recent errors to return + schema: + type: integer + format: Int32 + - name: type + in: query + description: Error type filter + schema: + type: string + nullable: true + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/PlaceOS__Core__EdgeError' + 404: + description: Not Found + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__CommonError' + 406: + description: Not Acceptable + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 415: + description: Unsupported Media Type + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 422: + description: Unprocessable Entity + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + 400: + description: Bad Request + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + /api/core/v1/status/edge/{edge_id}/modules/status: + get: + summary: Get module status for a specific edge + tags: + - Status + operationId: PlaceOS::Core::Api::Status_edge_module_status + parameters: + - name: edge_id + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__EdgeModuleStatus' + 404: + description: Not Found + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__CommonError' + 406: + description: Not Acceptable + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 415: + description: Unsupported Media Type + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 422: + description: Unprocessable Entity + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + 400: + description: Bad Request + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + /api/core/v1/status/edges/health: + get: + summary: Get health status for all edges + tags: + - Status + operationId: PlaceOS::Core::Api::Status_edges_health + parameters: [] + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/Hash_String__PlaceOS__Core__EdgeHealth_' + 404: + description: Not Found + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__CommonError' + 406: + description: Not Acceptable + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 415: + description: Unsupported Media Type + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 422: + description: Unprocessable Entity + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + 400: + description: Bad Request + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + /api/core/v1/status/edges/connections: + get: + summary: Get connection status for all edges + tags: + - Status + operationId: PlaceOS::Core::Api::Status_edge_connections + parameters: [] + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/Hash_String__PlaceOS__Core__ConnectionMetrics_' + 404: + description: Not Found + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__CommonError' + 406: + description: Not Acceptable + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 415: + description: Unsupported Media Type + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 422: + description: Unprocessable Entity + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + 400: + description: Bad Request + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + /api/core/v1/status/edges/errors: + get: + summary: Get errors from all edges + tags: + - Status + operationId: PlaceOS::Core::Api::Status_all_edge_errors + parameters: + - name: limit + in: query + description: Number of recent errors to return per edge + schema: + type: integer + format: Int32 + - name: type + in: query + description: Error type filter + schema: + type: string + nullable: true + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/Hash_String__Array_PlaceOS__Core__EdgeError__' + 404: + description: Not Found + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__CommonError' + 406: + description: Not Acceptable + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 415: + description: Unsupported Media Type + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 422: + description: Unprocessable Entity + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + 400: + description: Bad Request + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + /api/core/v1/status/edges/modules/failures: + get: + summary: Get module failures from all edges + tags: + - Status + operationId: PlaceOS::Core::Api::Status_edge_module_failures + parameters: [] + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/Hash_String__Array_PlaceOS__Core__ModuleInitError__' + 404: + description: Not Found + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__CommonError' + 406: + description: Not Acceptable + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 415: + description: Unsupported Media Type + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 422: + description: Unprocessable Entity + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + 400: + description: Bad Request + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + /api/core/v1/status/edges/statistics: + get: + summary: Get overall edge statistics + tags: + - Status + operationId: PlaceOS::Core::Api::Status_edge_statistics + parameters: [] + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Status__EdgeStatistics' + 404: + description: Not Found + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__CommonError' + 406: + description: Not Acceptable + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 415: + description: Unsupported Media Type + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ContentError' + 422: + description: Unprocessable Entity + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' + 400: + description: Bad Request + content: + application/json: + schema: + $ref: '#/components/schemas/PlaceOS__Core__Api__Application__ParameterError' +components: + schemas: + _Array_PlaceOS__Core__DriverStore__TaskStatus____String_: + anyOf: + - type: array + items: + type: object + properties: + state: + type: string + enum: + - pending + - running + - cancelled + - error + - done + id: + type: string + message: + type: string + driver: + type: string + repo: + type: string + branch: + type: string + commit: + type: string + timestamp: + type: string + format: date-time + required: + - state + - id + - message + - driver + - repo + - branch + - commit + - timestamp + - type: string + PlaceOS__Core__DriverStore__CancelStatus: + type: object + properties: + status: + type: string + message: + type: string + required: + - status + - message + Bool: + type: boolean + String: + type: string + JSON__Any: + type: object + PlaceOS__Model__Version: type: object properties: service: @@ -1112,6 +1830,244 @@ components: required: - local - edge + PlaceOS__Core__EdgeError: + type: object + properties: + timestamp: + type: string + format: date-time + edge_id: + type: string + error_type: + type: string + enum: + - connection + - module_init + - module_execution + - driver_load + - system_resource + message: + type: string + context: + type: object + additionalProperties: + type: string + severity: + type: string + enum: + - info + - warning + - error + - critical + required: + - timestamp + - edge_id + - error_type + - message + - context + - severity + description: Edge-specific error tracking + PlaceOS__Core__EdgeModuleStatus: + type: object + properties: + edge_id: + type: string + total_modules: + type: integer + format: Int32 + running_modules: + type: integer + format: Int32 + failed_modules: + type: array + items: + type: string + initialization_errors: + type: array + items: + type: object + properties: + module_id: + type: string + driver_key: + type: string + error_message: + type: string + timestamp: + type: string + format: date-time + retry_count: + type: integer + format: Int32 + required: + - module_id + - driver_key + - error_message + - timestamp + - retry_count + required: + - edge_id + - total_modules + - running_modules + - failed_modules + - initialization_errors + description: Edge module status aggregation + Hash_String__PlaceOS__Core__EdgeHealth_: + type: object + additionalProperties: + type: object + properties: + edge_id: + type: string + connected: + type: boolean + last_seen: + type: string + format: date-time + connection_uptime: + type: object + error_count_24h: + type: integer + format: Int32 + module_count: + type: integer + format: Int32 + failed_modules: + type: array + items: + type: string + required: + - edge_id + - connected + - last_seen + - connection_uptime + - error_count_24h + - module_count + - failed_modules + Hash_String__PlaceOS__Core__ConnectionMetrics_: + type: object + additionalProperties: + type: object + properties: + edge_id: + type: string + total_connections: + type: integer + format: Int32 + failed_connections: + type: integer + format: Int32 + average_uptime: + type: object + last_connection_attempt: + type: string + format: date-time + last_successful_connection: + type: string + format: date-time + required: + - edge_id + - total_connections + - failed_connections + - average_uptime + - last_connection_attempt + - last_successful_connection + Hash_String__Array_PlaceOS__Core__EdgeError__: + type: object + additionalProperties: + type: array + items: + type: object + properties: + timestamp: + type: string + format: date-time + edge_id: + type: string + error_type: + type: string + enum: + - connection + - module_init + - module_execution + - driver_load + - system_resource + message: + type: string + context: + type: object + additionalProperties: + type: string + severity: + type: string + enum: + - info + - warning + - error + - critical + required: + - timestamp + - edge_id + - error_type + - message + - context + - severity + Hash_String__Array_PlaceOS__Core__ModuleInitError__: + type: object + additionalProperties: + type: array + items: + type: object + properties: + module_id: + type: string + driver_key: + type: string + error_message: + type: string + timestamp: + type: string + format: date-time + retry_count: + type: integer + format: Int32 + required: + - module_id + - driver_key + - error_message + - timestamp + - retry_count + PlaceOS__Core__Api__Status__EdgeStatistics: + type: object + properties: + total_edges: + type: integer + format: Int32 + connected_edges: + type: integer + format: Int32 + disconnected_edges: + type: integer + format: Int32 + total_errors_24h: + type: integer + format: Int32 + total_modules: + type: integer + format: Int32 + failed_modules: + type: integer + format: Int32 + average_uptime: + type: object + required: + - total_edges + - connected_edges + - disconnected_edges + - total_errors_24h + - total_modules + - failed_modules + - average_uptime PlaceOS__Core__Api__Application__CommonError: type: object properties: diff --git a/spec/api/edge_monitoring_spec.cr b/spec/api/edge_monitoring_spec.cr new file mode 100644 index 00000000..05ef552e --- /dev/null +++ b/spec/api/edge_monitoring_spec.cr @@ -0,0 +1,49 @@ +require "../helper" + +module PlaceOS::Core::Api + describe EdgeMonitoring, tags: "api" do + client = AC::SpecHelper.client + + namespace = EdgeMonitoring::NAMESPACE[0] + json_headers = HTTP::Headers{ + "Content-Type" => "application/json", + } + + describe "POST /cleanup" do + it "responds with success message" do + response = client.post("#{namespace}cleanup", headers: json_headers, body: {hours: 24}.to_json) + response.status_code.should eq 200 + + result = JSON.parse(response.body) + result["success"].as_bool.should be_true + result["message"].as_s.should contain("24 hours") + result["timestamp"].as_s.should_not be_empty + end + + it "accepts custom hours parameter" do + response = client.post("#{namespace}cleanup?hours=48", headers: json_headers) + response.status_code.should eq 200 + + result = JSON.parse(response.body) + result["success"].as_bool.should be_true + result["message"].as_s.should contain("48 hours") + end + end + + describe "GET /summary" do + it "returns error summary statistics" do + response = client.get("#{namespace}summary", headers: json_headers) + response.status_code.should eq 200 + + result = JSON.parse(response.body) + result["total_edges"].as_i.should be >= 0 + result["connected_edges"].as_i.should be >= 0 + result["edges_with_errors"].as_i.should be >= 0 + result["total_errors_24h"].as_i.should be >= 0 + result["total_modules"].as_i.should be >= 0 + result["failed_modules"].as_i.should be >= 0 + result["timestamp"].as_s.should_not be_empty + end + end + end +end diff --git a/spec/api/status_spec.cr b/spec/api/status_spec.cr index 95d1dadc..9590f152 100644 --- a/spec/api/status_spec.cr +++ b/spec/api/status_spec.cr @@ -32,5 +32,79 @@ module PlaceOS::Core::Api pending "status/driver" pending "status/load" + + describe "edge endpoints" do + it "GET /edge/:edge_id/errors returns errors for specific edge" do + edge_id = "test-edge-1" + + response = client.get("#{namespace}edge/#{edge_id}/errors", headers: json_headers) + response.status_code.should eq 200 + + # Just verify it's valid JSON array + result = JSON.parse(response.body) + result.should be_a(JSON::Any) + result.as_a.should be_a(Array(JSON::Any)) + end + + it "GET /edge/:edge_id/modules/status returns module status for specific edge" do + edge_id = "test-edge-1" + + response = client.get("#{namespace}edge/#{edge_id}/modules/status", headers: json_headers) + response.status_code.should eq 200 + + # Just verify it's valid JSON and has expected fields + result = JSON.parse(response.body) + result.should be_a(JSON::Any) + result["edge_id"].as_s.should eq edge_id + end + + it "GET /edges/health returns health status for all edges" do + response = client.get("#{namespace}edges/health", headers: json_headers) + response.status_code.should eq 200 + + # Just verify it's valid JSON and has the expected structure + result = JSON.parse(response.body) + result.should be_a(JSON::Any) + end + + it "GET /edges/connections returns connection metrics for all edges" do + response = client.get("#{namespace}edges/connections", headers: json_headers) + response.status_code.should eq 200 + + # Just verify it's valid JSON + result = JSON.parse(response.body) + result.should be_a(JSON::Any) + end + + it "GET /edges/errors returns errors from all edges" do + response = client.get("#{namespace}edges/errors", headers: json_headers) + response.status_code.should eq 200 + + # Just verify it's valid JSON + result = JSON.parse(response.body) + result.should be_a(JSON::Any) + end + + it "GET /edges/modules/failures returns module failures from all edges" do + response = client.get("#{namespace}edges/modules/failures", headers: json_headers) + response.status_code.should eq 200 + + # Just verify it's valid JSON + result = JSON.parse(response.body) + result.should be_a(JSON::Any) + end + + it "GET /edges/statistics returns overall edge statistics" do + response = client.get("#{namespace}edges/statistics", headers: json_headers) + response.status_code.should eq 200 + + # Just verify it's valid JSON and has expected fields + result = JSON.parse(response.body) + result.should be_a(JSON::Any) + result["total_edges"].as_i.should be >= 0 + result["connected_edges"].as_i.should be >= 0 + result["disconnected_edges"].as_i.should be >= 0 + end + end end end diff --git a/spec/placeos-edge/client_spec.cr b/spec/placeos-edge/client_spec.cr index 62b14b80..5cb64dc2 100644 --- a/spec/placeos-edge/client_spec.cr +++ b/spec/placeos-edge/client_spec.cr @@ -48,5 +48,69 @@ module PlaceOS::Edge raise "timed out" end end + + it "sends error and health reports" do + client = Client.new + client_ws, server_ws = mock_sockets + + error_reports = Channel(Protocol::Message::ErrorReport).new + health_reports = Channel(Protocol::Message::HealthReport).new + + server_ws.on_message do |m| + message = Protocol::Text.from_json(m) + case message.body + when Protocol::Message::Register + # Respond to registration + server_ws.send(Protocol::Text.new(message.sequence_id, Protocol::Message::RegisterResponse.new(true)).to_json) + when Protocol::Message::ErrorReport + error_reports.send(message.body.as(Protocol::Message::ErrorReport)) + server_ws.send(Protocol::Text.new(message.sequence_id, Protocol::Message::Success.new(true)).to_json) + when Protocol::Message::HealthReport + health_reports.send(message.body.as(Protocol::Message::HealthReport)) + server_ws.send(Protocol::Text.new(message.sequence_id, Protocol::Message::Success.new(true)).to_json) + end + end + + spawn { server_ws.run } + + spawn { + client.connect(client_ws) do + # Track an error to trigger immediate reporting + client.track_error( + PlaceOS::Core::ErrorType::ModuleExecution, + "Test error message", + {"test" => "context"}, + PlaceOS::Core::Severity::Critical + ) + end + } + + Fiber.yield + sleep 0.1 # Give time for immediate error report + + # Should receive an immediate error report for critical error + select + when error_report = error_reports.receive + error_report.edge_id.should_not be_empty + error_report.errors.size.should eq(1) + + # Parse the error JSON + error_json = error_report.errors.first + error = PlaceOS::Core::EdgeError.from_json(error_json) + error.error_type.should eq(PlaceOS::Core::ErrorType::ModuleExecution) + error.message.should eq("Test error message") + error.severity.should eq(PlaceOS::Core::Severity::Critical) + when timeout 2.seconds + raise "timed out waiting for error report" + end + + # Should also receive periodic health reports (though we won't wait for the full interval) + # We can test the health report generation directly + health = client.get_edge_health + health.edge_id.should_not be_empty + health.connected.should be_true + health.module_count.should eq(0) # No modules loaded in test + health.error_count_24h.should eq(1) # One error tracked + end end end diff --git a/src/api/edge_monitoring.cr b/src/api/edge_monitoring.cr new file mode 100644 index 00000000..58535f3d --- /dev/null +++ b/src/api/edge_monitoring.cr @@ -0,0 +1,201 @@ +require "json" +require "http/web_socket" + +require "../placeos-core/module_manager" +require "../placeos-core/edge_error" +require "./application" + +module PlaceOS::Core::Api + class EdgeMonitoring < Application + base "/api/core/v1/monitoring/" + + getter module_manager : ModuleManager { ModuleManager.instance } + + # WebSocket endpoint for real-time error monitoring of a specific edge + @[AC::Route::WebSocket("/edge/:edge_id/errors/stream")] + def edge_error_stream( + socket, + @[AC::Param::Info(description: "the edge ID to monitor", example: "edge-1234")] + edge_id : String, + ) : Nil + Log.info { {message: "edge error stream connected", edge_id: edge_id} } + + # Send initial errors + edge_errors_hash = module_manager.edge_errors(edge_id) + initial_errors = edge_errors_hash[edge_id]? || [] of PlaceOS::Core::EdgeError + recent_errors = initial_errors.size > 10 ? initial_errors[-10..-1] : initial_errors + + socket.send(%({ + "type": "initial_errors", + "edge_id": "#{edge_id}", + "errors": #{recent_errors.to_json} + })) + + # Keep connection alive and send periodic updates + spawn do + loop do + sleep 5.seconds + break if socket.closed? + + begin + # Send current health status + health = module_manager.edge_health_status[edge_id]? + if health + socket.send(%({ + "type": "health_update", + "edge_id": "#{edge_id}", + "health": #{health.to_json} + })) + end + + # Send recent errors (last 5 seconds worth) + edge_errors_hash = module_manager.edge_errors(edge_id) + recent_errors = edge_errors_hash[edge_id]? || [] of PlaceOS::Core::EdgeError + new_errors = recent_errors.select { |error| error.timestamp > Time.utc - 5.seconds } + + if !new_errors.empty? + socket.send(%({ + "type": "new_errors", + "edge_id": "#{edge_id}", + "errors": #{new_errors.to_json} + })) + end + rescue e + Log.error(exception: e) { "error in edge error stream" } + break + end + end + end + + socket.on_close do + Log.info { {message: "edge error stream disconnected", edge_id: edge_id} } + end + end + + # WebSocket endpoint for real-time error monitoring of all edges + @[AC::Route::WebSocket("/edges/errors/stream")] + def all_edges_error_stream(socket) : Nil + Log.info { "all edges error stream connected" } + + # Send initial state + initial_health = module_manager.edge_health_status + socket.send(%({ + "type": "initial_health", + "edges": #{initial_health.to_json} + })) + + # Keep connection alive and send periodic updates + spawn do + loop do + sleep 3.seconds + break if socket.closed? + + begin + # Send health updates for all edges + health_status = module_manager.edge_health_status + socket.send(%({ + "type": "health_update_all", + "timestamp": "#{Time.utc}", + "edges": #{health_status.to_json} + })) + + # Send recent errors from all edges + all_errors = module_manager.edge_errors + recent_errors = {} of String => Array(PlaceOS::Core::EdgeError) + + all_errors.each do |edge_id, errors| + new_errors = errors.select { |error| error.timestamp > Time.utc - 3.seconds } + recent_errors[edge_id] = new_errors unless new_errors.empty? + end + + if !recent_errors.empty? + socket.send(%({ + "type": "new_errors_all", + "timestamp": "#{Time.utc}", + "edges": #{recent_errors.to_json} + })) + end + rescue e + Log.error(exception: e) { "error in all edges error stream" } + break + end + end + end + + socket.on_close do + Log.info { "all edges error stream disconnected" } + end + end + + # WebSocket endpoint for real-time module status monitoring + @[AC::Route::WebSocket("/edges/modules/stream")] + def edge_modules_stream(socket) : Nil + Log.info { "edge modules stream connected" } + + # Send initial module status + initial_status = module_manager.edge_module_status + socket.send(%({ + "type": "initial_modules", + "edges": #{initial_status.to_json} + })) + + # Keep connection alive and send periodic updates + spawn do + loop do + sleep 10.seconds + break if socket.closed? + + begin + # Send module status updates + module_status = module_manager.edge_module_status + socket.send(%({ + "type": "module_update_all", + "timestamp": "#{Time.utc}", + "edges": #{module_status.to_json} + })) + rescue e + Log.error(exception: e) { "error in edge modules stream" } + break + end + end + end + + socket.on_close do + Log.info { "edge modules stream disconnected" } + end + end + + # REST endpoint to trigger error cleanup + @[AC::Route::POST("/cleanup")] + def cleanup_errors( + @[AC::Param::Info(description: "Hours to keep errors (default: 24)")] + hours : Int32 = 24, + ) : JSON::Any + older_than = hours.hours + module_manager.cleanup_old_edge_errors(older_than) + + JSON::Any.new({ + "success" => JSON::Any.new(true), + "message" => JSON::Any.new("Cleaned up errors older than #{hours} hours"), + "timestamp" => JSON::Any.new(Time.utc.to_s), + }) + end + + # REST endpoint to get error summary + @[AC::Route::GET("/summary")] + def error_summary : JSON::Any + health_status = module_manager.edge_health_status + connection_status = module_manager.edge_connection_status + + JSON::Any.new({ + "total_edges" => JSON::Any.new(health_status.size.to_i64), + "connected_edges" => JSON::Any.new(connection_status.count { |_, connected| connected }.to_i64), + "edges_with_errors" => JSON::Any.new(health_status.count { |_, health| health.error_count_24h > 0 }.to_i64), + "total_errors_24h" => JSON::Any.new(health_status.sum { |_, health| health.error_count_24h }.to_i64), + "total_modules" => JSON::Any.new(health_status.sum { |_, health| health.module_count }.to_i64), + "failed_modules" => JSON::Any.new(health_status.sum { |_, health| health.failed_modules.size }.to_i64), + "timestamp" => JSON::Any.new(Time.utc.to_s), + }) + end + end +end diff --git a/src/api/status.cr b/src/api/status.cr index f21a6768..37ed9849 100644 --- a/src/api/status.cr +++ b/src/api/status.cr @@ -2,6 +2,7 @@ require "hardware" require "../placeos-core/module_manager" require "../placeos-core/resource_manager" +require "../placeos-core/edge_error" require "./application" module PlaceOS::Core::Api @@ -73,5 +74,112 @@ module PlaceOS::Core::Api edge: module_manager.edge_processes.loaded_modules, ) end + + # Edge Error Monitoring Endpoints + ############################################################################################### + + # Get errors for a specific edge + @[AC::Route::GET("/edge/:edge_id/errors")] + def edge_errors( + edge_id : String, + @[AC::Param::Info(description: "Number of recent errors to return")] + limit : Int32 = 50, + @[AC::Param::Info(description: "Error type filter")] + type : String? = nil, + ) : Array(Core::EdgeError) + errors = module_manager.edge_errors(edge_id)[edge_id]? || [] of Core::EdgeError + + # Filter by type if specified + if type + error_type = Core::ErrorType.parse?(type.camelcase) + errors = errors.select(&.error_type.==(error_type)) if error_type + end + + errors.last(limit) + end + + # Get module status for a specific edge + @[AC::Route::GET("/edge/:edge_id/modules/status")] + def edge_module_status(edge_id : String) : Core::EdgeModuleStatus + module_manager.edge_module_status[edge_id]? || Core::EdgeModuleStatus.new(edge_id) + end + + # Get health status for all edges + @[AC::Route::GET("/edges/health")] + def edges_health : Hash(String, Core::EdgeHealth) + module_manager.edge_health_status + end + + # Get connection status for all edges + @[AC::Route::GET("/edges/connections")] + def edge_connections : Hash(String, Core::ConnectionMetrics) + module_manager.edge_connection_metrics + end + + # Get errors from all edges + @[AC::Route::GET("/edges/errors")] + def all_edge_errors( + @[AC::Param::Info(description: "Number of recent errors to return per edge")] + limit : Int32 = 50, + @[AC::Param::Info(description: "Error type filter")] + type : String? = nil, + ) : Hash(String, Array(Core::EdgeError)) + all_errors = module_manager.edge_errors + + # Filter by type if specified + if type + error_type = Core::ErrorType.parse?(type.camelcase) + if error_type + all_errors = all_errors.transform_values do |errors| + errors.select(&.error_type.==(error_type)) + end + end + end + + # Apply limit to each edge + all_errors.transform_values(&.last(limit)) + end + + # Get module failures from all edges + @[AC::Route::GET("/edges/modules/failures")] + def edge_module_failures : Hash(String, Array(Core::ModuleInitError)) + module_manager.edge_module_status.transform_values(&.initialization_errors) + end + + # Get overall edge statistics + @[AC::Route::GET("/edges/statistics")] + def edge_statistics : EdgeStatistics + health_status = module_manager.edge_health_status + + total_edges = health_status.size + connected_edges = health_status.count { |_, health| health.connected } + total_errors_24h = health_status.sum { |_, health| health.error_count_24h } + total_modules = health_status.sum { |_, health| health.module_count } + failed_modules = health_status.sum { |_, health| health.failed_modules.size } + + EdgeStatistics.new( + total_edges: total_edges, + connected_edges: connected_edges, + disconnected_edges: total_edges - connected_edges, + total_errors_24h: total_errors_24h, + total_modules: total_modules, + failed_modules: failed_modules, + average_uptime: health_status.values.map(&.connection_uptime).sum / Math.max(1, total_edges) + ) + end + + record EdgeStatistics, + total_edges : Int32, + connected_edges : Int32, + disconnected_edges : Int32, + total_errors_24h : Int32, + total_modules : Int32, + failed_modules : Int32, + average_uptime : Time::Span do + include JSON::Serializable + + @[JSON::Field(converter: PlaceOS::Core::TimeSpanConverter)] + getter average_uptime : Time::Span + end end end diff --git a/src/placeos-core/edge_error.cr b/src/placeos-core/edge_error.cr new file mode 100644 index 00000000..d4b4ef44 --- /dev/null +++ b/src/placeos-core/edge_error.cr @@ -0,0 +1,213 @@ +require "json" + +module PlaceOS::Core + # Edge-specific error tracking + record EdgeError, + timestamp : Time, + edge_id : String, + error_type : ErrorType, + message : String, + context : Hash(String, String), + severity : Severity do + include JSON::Serializable + + @[JSON::Field(converter: Time::EpochConverter)] + getter timestamp : Time + + def initialize(@edge_id, @error_type, @message, @context = {} of String => String, @severity = Severity::Error) + @timestamp = Time.utc + end + end + + enum ErrorType + Connection + ModuleInit + ModuleExecution + DriverLoad + SystemResource + + def to_json(json : JSON::Builder) + json.string(to_s.underscore) + end + end + + enum Severity + Info + Warning + Error + Critical + end + + # Module initialization tracking + record ModuleInitError, + module_id : String, + driver_key : String, + error_message : String, + timestamp : Time, + retry_count : Int32 do + include JSON::Serializable + + @[JSON::Field(converter: Time::EpochConverter)] + getter timestamp : Time + + def initialize(@module_id, @driver_key, @error_message, @retry_count = 0) + @timestamp = Time.utc + end + end + + # Edge health status + class EdgeHealth + include JSON::Serializable + + getter edge_id : String + getter connected : Bool + + @[JSON::Field(converter: Time::EpochConverter)] + getter last_seen : Time + + @[JSON::Field(converter: PlaceOS::Core::TimeSpanConverter)] + getter connection_uptime : Time::Span + + getter error_count_24h : Int32 + getter module_count : Int32 + getter failed_modules : Array(String) + + def initialize(@edge_id : String, @connected : Bool = false, @module_count : Int32 = 0, @failed_modules : Array(String) = [] of String) + @last_seen = Time.utc + @connection_uptime = Time::Span.zero + @error_count_24h = 0 + end + + def initialize(@edge_id : String, @connected : Bool, @last_seen : Time, @connection_uptime : Time::Span, @error_count_24h : Int32, @module_count : Int32, @failed_modules : Array(String)) + end + + def copy_with( + edge_id : String? = nil, + connected : Bool? = nil, + last_seen : Time? = nil, + connection_uptime : Time::Span? = nil, + error_count_24h : Int32? = nil, + module_count : Int32? = nil, + failed_modules : Array(String)? = nil, + ) + EdgeHealth.new( + edge_id || @edge_id, + connected.nil? ? @connected : connected, + last_seen || @last_seen, + connection_uptime || @connection_uptime, + error_count_24h || @error_count_24h, + module_count || @module_count, + failed_modules || @failed_modules + ) + end + end + + # Connection metrics for edges + record ConnectionMetrics, + edge_id : String, + total_connections : Int32, + failed_connections : Int32, + average_uptime : Time::Span, + last_connection_attempt : Time, + last_successful_connection : Time do + include JSON::Serializable + + @[JSON::Field(converter: Time::EpochConverter)] + getter last_connection_attempt : Time + + @[JSON::Field(converter: Time::EpochConverter)] + getter last_successful_connection : Time + + @[JSON::Field(converter: PlaceOS::Core::TimeSpanConverter)] + getter average_uptime : Time::Span + + def initialize(@edge_id) + @total_connections = 0 + @failed_connections = 0 + @average_uptime = Time::Span.zero + @last_connection_attempt = Time.utc + @last_successful_connection = Time.utc + end + end + + # Edge module status aggregation + record EdgeModuleStatus, + edge_id : String, + total_modules : Int32, + running_modules : Int32, + failed_modules : Array(String), + initialization_errors : Array(ModuleInitError) do + include JSON::Serializable + + def initialize(@edge_id, @total_modules = 0, @running_modules = 0, @failed_modules = [] of String, @initialization_errors = [] of ModuleInitError) + end + end + + # Connection history for tracking edge connectivity + record ConnectionHistory, + edge_id : String, + connection_events : Array(ConnectionEvent) do + include JSON::Serializable + + def initialize(@edge_id, @connection_events = [] of ConnectionEvent) + end + end + + record ConnectionEvent, + timestamp : Time, + event_type : ConnectionEventType, + duration : Time::Span?, + error_message : String? do + include JSON::Serializable + + @[JSON::Field(converter: Time::EpochConverter)] + getter timestamp : Time + + @[JSON::Field(converter: PlaceOS::Core::TimeSpanConverterOptional)] + getter duration : Time::Span? + + def initialize(@event_type, @error_message = nil, @duration = nil) + @timestamp = Time.utc + end + end + + enum ConnectionEventType + Connected + Disconnected + Reconnected + Failed + + def to_json(json : JSON::Builder) + json.string(to_s.underscore) + end + end + + module TimeSpanConverter + def self.from_json(pull : JSON::PullParser) : Time::Span + (pull.read_int).seconds + end + + def self.to_json(value : Time::Span, json : JSON::Builder) : Nil + json.number(value.total_seconds.to_i64) + end + end + + module TimeSpanConverterOptional + def self.from_json(pull : JSON::PullParser) : Time::Span? + value = pull.read_raw + if value.is_a?(Int) + (value.as(Int64)).seconds + else + nil + end + end + + def self.to_json(span : Time::Span?, json : JSON::Builder) : Nil + if value = span + json.number(value.total_seconds.to_i64) + else + json.null + end + end + end +end diff --git a/src/placeos-core/module_manager.cr b/src/placeos-core/module_manager.cr index 06951e8e..a59a1be4 100644 --- a/src/placeos-core/module_manager.cr +++ b/src/placeos-core/module_manager.cr @@ -15,6 +15,7 @@ require "./process_check" require "./process_manager/edge" require "./process_manager/local" require "./driver_manager" +require "./edge_error" module PlaceOS::Core class ModuleManager < Resource(Model::Module) @@ -80,6 +81,7 @@ module PlaceOS::Core def start(timeout : Time::Span = LOAD_TIMEOUT) start_clustering start_process_check + start_error_cleanup stabilize_lock.synchronize do super(timeout) @@ -433,5 +435,75 @@ module PlaceOS::Core def self.needs_restart?(mod : Model::Module) : Bool mod.ip_changed? || mod.port_changed? || mod.tls_changed? || mod.udp_changed? || mod.makebreak_changed? || mod.uri_changed? end + + # Edge Error Aggregation Methods + ############################################################################################### + + # Get errors from all edges or a specific edge + def edge_errors(edge_id : String? = nil) : Hash(String, Array(PlaceOS::Core::EdgeError)) + if edge_id + errors = [] of PlaceOS::Core::EdgeError + edge_processes.for?(edge_id) { |manager| errors = manager.get_recent_errors } + {edge_id => errors} + else + edge_processes.collect_edge_errors + end + end + + # Get module failures from all edges or a specific edge + def edge_module_failures(edge_id : String? = nil) : Hash(String, Array(PlaceOS::Core::ModuleError)) + if edge_id + failures = edge_processes.edge_module_failures(edge_id) + {edge_id => failures.values.flatten.map { |init_error| + PlaceOS::Core::ModuleError.new("Module initialization failed: #{init_error.error_message}") + }} + else + edge_processes.edge_module_status.transform_values do |status| + status.initialization_errors.map { |init_error| + PlaceOS::Core::ModuleError.new("Module initialization failed: #{init_error.error_message}") + } + end + end + end + + # Get connection status for all edges + def edge_connection_status : Hash(String, Bool) + edge_processes.edge_connection_status + end + + # Get health status for all edges + def edge_health_status : Hash(String, PlaceOS::Core::EdgeHealth) + edge_processes.edge_health_status + end + + # Get module status for all edges + def edge_module_status : Hash(String, PlaceOS::Core::EdgeModuleStatus) + edge_processes.edge_module_status + end + + # Get connection metrics for all edges + def edge_connection_metrics : Hash(String, PlaceOS::Core::ConnectionMetrics) + edge_processes.edge_connection_metrics + end + + # Cleanup old errors across all edges + def cleanup_old_edge_errors(older_than : Time::Span = 24.hours) + edge_processes.cleanup_old_errors(older_than) + end + + # Start periodic error cleanup task + protected def start_error_cleanup + spawn(name: "edge_error_cleanup") do + loop do + sleep 1.hour + begin + cleanup_old_edge_errors + Log.debug { "completed periodic edge error cleanup" } + rescue e + Log.error(exception: e) { "error during periodic edge error cleanup" } + end + end + end + end end end diff --git a/src/placeos-core/process_manager/edge.cr b/src/placeos-core/process_manager/edge.cr index 659f9bc1..b00ad2f9 100644 --- a/src/placeos-core/process_manager/edge.cr +++ b/src/placeos-core/process_manager/edge.cr @@ -2,12 +2,15 @@ require "placeos-driver/protocol/management" require "redis-cluster" require "../process_manager" +require "./common" +require "../edge_error" require "../../placeos-edge/transport" require "../../placeos-edge/protocol" module PlaceOS::Core class ProcessManager::Edge include ProcessManager + include Common alias Transport = PlaceOS::Edge::Transport alias Protocol = PlaceOS::Edge::Protocol @@ -17,17 +20,31 @@ module PlaceOS::Core protected getter(store : DriverStore) { DriverStore.new } + # Error tracking + private getter recent_errors = Deque(EdgeError).new(100) + private getter module_init_failures = Hash(String, Array(ModuleInitError)).new { |h, k| h[k] = [] of ModuleInitError } + private getter connection_health : EdgeHealth + private getter error_lock = Mutex.new + private getter connection_start_time : Time + def initialize(@edge_id : String, socket : HTTP::WebSocket) + @connection_start_time = Time.utc + @connection_health = EdgeHealth.new(@edge_id, connected: true) + @transport = Transport.new do |(sequence_id, request)| if request.is_a?(Protocol::Client::Request) handle_request(sequence_id, request) else Log.error { {message: "unexpected edge request", request: request.to_json} } + track_error(ErrorType::Connection, "Unexpected edge request: #{request.to_json}") end end spawn { transport.listen(socket) } Fiber.yield + + # Track successful connection + track_connection_event(ConnectionEventType::Connected) end def handle_request(sequence_id : UInt64, request : Protocol::Client::Request) @@ -61,12 +78,25 @@ module PlaceOS::Core setting_value: YAML.parse(request.setting_value) ) end + when Protocol::Message::ErrorReport + boolean_response(sequence_id, request) do + # Process error reports from edge + process_edge_error_report(request) + true + end + when Protocol::Message::HealthReport + boolean_response(sequence_id, request) do + # Process health reports from edge + process_edge_health_report(request) + true + end end rescue e Log.error(exception: e) { { message: "failed to handle edge request", request: request.to_json, } } + track_error(ErrorType::ModuleExecution, e.message || "Unknown error", {"sequence_id" => sequence_id.to_s, "request_type" => request.type.to_s}) end def execute(module_id : String, payload : String, user_id : String?) @@ -89,7 +119,18 @@ module PlaceOS::Core end def load(module_id : String, driver_key : String) - !!Protocol.request(Protocol::Message::Load.new(module_id, ProcessManager.path_to_key(driver_key)), expect: Protocol::Message::Success) + success = !!Protocol.request(Protocol::Message::Load.new(module_id, ProcessManager.path_to_key(driver_key)), expect: Protocol::Message::Success) + + unless success + error = ModuleInitError.new(module_id, driver_key, "Failed to load module") + track_module_init_error(error) + track_error(ErrorType::ModuleInit, "Failed to load module #{module_id}", {"driver_key" => driver_key}) + end + + success + rescue e + track_error(ErrorType::DriverLoad, "Exception during module load: #{e.message}", {"module_id" => module_id, "driver_key" => driver_key}) + false end def unload(module_id : String) @@ -97,7 +138,19 @@ module PlaceOS::Core end def start(module_id : String, payload : String) - !!Protocol.request(Protocol::Message::Start.new(module_id, payload), expect: Protocol::Message::Success) + success = !!Protocol.request(Protocol::Message::Start.new(module_id, payload), expect: Protocol::Message::Success) + + unless success + driver_key = driver_key_for?(module_id) || "unknown" + error = ModuleInitError.new(module_id, driver_key, "Failed to start module") + track_module_init_error(error) + track_error(ErrorType::ModuleInit, "Failed to start module #{module_id}", {"driver_key" => driver_key}) + end + + success + rescue e + track_error(ErrorType::ModuleInit, "Exception during module start: #{e.message}", {"module_id" => module_id}) + false end def stop(module_id : String) @@ -328,5 +381,224 @@ module PlaceOS::Core message = match.pre_match unless match.nil? {message, exception} end + + # Error Tracking Methods + ############################################################################################### + + # Track an error for this edge + def track_error(type : ErrorType, message : String, context = {} of String => String, severity = Severity::Error) + error = EdgeError.new(@edge_id, type, message, context, severity) + + error_lock.synchronize do + recent_errors.push(error) + recent_errors.shift if recent_errors.size > 100 + + # Update health metrics + @connection_health = @connection_health.copy_with( + error_count_24h: @connection_health.error_count_24h + 1, + last_seen: Time.utc + ) + end + + Log.warn { { + edge_id: @edge_id, + error_type: type.to_s, + severity: severity.to_s, + message: message, + context: context.to_json, + } } + end + + # Track module initialization errors + def track_module_init_error(error : ModuleInitError) + error_lock.synchronize do + module_init_failures[error.module_id] << error + + # Keep only last 10 errors per module + if module_init_failures[error.module_id].size > 10 + module_init_failures[error.module_id].shift + end + + # Update failed modules list + failed_modules = @connection_health.failed_modules.dup + failed_modules << error.module_id unless failed_modules.includes?(error.module_id) + + @connection_health = @connection_health.copy_with( + failed_modules: failed_modules, + last_seen: Time.utc + ) + end + end + + # Track connection events + def track_connection_event(event_type : ConnectionEventType, error_message : String? = nil) + duration = case event_type + when .connected?, .reconnected? + nil + when .disconnected?, .failed? + Time.utc - @connection_start_time + else + nil + end + + case event_type + when .connected?, .reconnected? + error_lock.synchronize do + @connection_health = @connection_health.copy_with( + connected: true, + last_seen: Time.utc, + connection_uptime: Time.utc - @connection_start_time + ) + end + when .disconnected?, .failed? + error_lock.synchronize do + @connection_health = @connection_health.copy_with( + connected: false, + last_seen: Time.utc + ) + end + + if event_type.failed? + track_error(ErrorType::Connection, error_message || "Connection failed", severity: Severity::Critical) + end + end + end + + # Get recent errors for this edge + def get_recent_errors(limit : Int32 = 50) : Array(EdgeError) + error_lock.synchronize do + errors = recent_errors.to_a + if errors.size > limit + errors[-limit..-1] + else + errors + end + end + end + + # Get module initialization failures + def get_module_init_failures : Hash(String, Array(ModuleInitError)) + error_lock.synchronize do + module_init_failures.dup + end + end + + # Get edge health status + def get_edge_health : EdgeHealth + error_lock.synchronize do + # Update module count + module_count = get_module_managers.size + @connection_health = @connection_health.copy_with( + module_count: module_count, + last_seen: Time.utc, + connection_uptime: @connection_health.connected ? Time.utc - @connection_start_time : @connection_health.connection_uptime + ) + @connection_health + end + end + + # Get edge module status + def get_edge_module_status : EdgeModuleStatus + error_lock.synchronize do + managers = get_module_managers + total_modules = managers.size + + # Count running modules (simplified - assumes loaded = running) + running_modules = managers.count { |_, manager| manager.running? rescue false } + + failed_modules = @connection_health.failed_modules.dup + init_errors = module_init_failures.values.flatten + + EdgeModuleStatus.new(@edge_id, total_modules, running_modules, failed_modules, init_errors) + end + end + + # Clear old errors (called periodically) + def cleanup_old_errors(older_than : Time::Span = 24.hours) + cutoff_time = Time.utc - older_than + + error_lock.synchronize do + # Remove old errors + recent_errors.reject! { |error| error.timestamp < cutoff_time } + + # Reset 24h error count + @connection_health = @connection_health.copy_with( + error_count_24h: recent_errors.count { |error| error.timestamp > cutoff_time } + ) + + # Clean up old module init failures + module_init_failures.each do |module_id, errors| + errors.reject! { |error| error.timestamp < cutoff_time } + module_init_failures.delete(module_id) if errors.empty? + end + end + end + + # Protocol message handlers + ############################################################################################### + + private def process_edge_error_report(request : Protocol::Message::ErrorReport) + Log.info { {message: "processing error report from edge", edge_id: request.edge_id, error_count: request.errors.size} } + + error_lock.synchronize do + request.errors.each do |error_json| + begin + # Parse the JSON serialized EdgeError + edge_error = EdgeError.from_json(error_json) + + # Ensure the edge_id matches + if edge_error.edge_id == @edge_id + # Add to recent errors (bounded deque will automatically remove old ones) + recent_errors.push(edge_error) + + # Update connection health error count + cutoff_time = Time.utc - 24.hours + @connection_health = @connection_health.copy_with( + error_count_24h: recent_errors.count { |e| e.timestamp > cutoff_time }, + last_seen: Time.utc + ) + + Log.debug { {message: "stored edge error", edge_id: @edge_id, error_type: edge_error.error_type.to_s, severity: edge_error.severity.to_s} } + else + Log.warn { {message: "edge error report edge_id mismatch", expected: @edge_id, received: edge_error.edge_id} } + end + rescue ex : JSON::ParseException + Log.error(exception: ex) { {message: "failed to parse edge error from JSON", error_json: error_json} } + track_error(ErrorType::Connection, "Failed to parse edge error report: #{ex.message}") + end + end + end + end + + private def process_edge_health_report(request : Protocol::Message::HealthReport) + Log.info { {message: "processing health report from edge", edge_id: request.edge_id} } + + begin + # Parse the JSON serialized EdgeHealth + edge_health = EdgeHealth.from_json(request.health) + + # Ensure the edge_id matches + if edge_health.edge_id == @edge_id + error_lock.synchronize do + # Update our connection health with data from edge + @connection_health = @connection_health.copy_with( + connected: edge_health.connected, + last_seen: Time.utc, + module_count: edge_health.module_count, + failed_modules: edge_health.failed_modules, + # Keep our own connection_uptime calculation + connection_uptime: Time.utc - @connection_start_time + ) + end + + Log.debug { {message: "updated edge health", edge_id: @edge_id, module_count: edge_health.module_count, failed_modules: edge_health.failed_modules.size} } + else + Log.warn { {message: "edge health report edge_id mismatch", expected: @edge_id, received: edge_health.edge_id} } + end + rescue ex : JSON::ParseException + Log.error(exception: ex) { {message: "failed to parse edge health from JSON", health_json: request.health} } + track_error(ErrorType::Connection, "Failed to parse edge health report: #{ex.message}") + end + end end end diff --git a/src/placeos-edge/client.cr b/src/placeos-edge/client.cr index 9e89b48d..6342aa7c 100644 --- a/src/placeos-edge/client.cr +++ b/src/placeos-edge/client.cr @@ -1,11 +1,13 @@ require "simple_retry" require "rwlock" require "uri" +require "tasker" require "placeos-driver/protocol/management" require "../placeos-core/process_manager/common" require "../placeos-core/driver_manager" +require "../placeos-core/edge_error" require "./constants" require "./protocol" @@ -41,6 +43,14 @@ module PlaceOS::Edge # module_id => payload @pending_start = {} of String => String + # Error tracking and reporting + private getter recent_errors = Deque(Core::EdgeError).new(100) + private getter error_lock = Mutex.new + private getter edge_id : String { @secret.split("-").first? || "unknown" } + private getter error_report_task : Tasker::Repeat(Nil)? + private getter health_report_task : Tasker::Repeat(Nil)? + private getter connection_start_time : Time = Time.utc + getter host : String { uri.to_s.gsub(uri.request_target, "") } def initialize( @@ -81,6 +91,9 @@ module PlaceOS::Edge @loading_modules = Hash(String, Array(String)).new { |hash, key| hash[key] = [] of String } @pending_start = {} of String => String end + + # Stop periodic reporting on disconnect + stop_periodic_reporting nil }, on_connect: -> { @@ -105,6 +118,9 @@ module PlaceOS::Edge # Send ping frames spawn { transport.ping if ping? } + # Start periodic reporting + start_periodic_reporting + yield close_channel.receive? @@ -149,6 +165,7 @@ module PlaceOS::Edge module_id: request.module_id, message: "execute errored", } } + track_error(Core::ErrorType::ModuleExecution, "Module execution failed: #{error.message}", {"module_id" => request.module_id}) ({false, {message: error.message, backtrace: error.backtrace?, code: error.code}.to_json, error.code}) end @@ -218,6 +235,7 @@ module PlaceOS::Edge response = Protocol.request(registration_message, expect: Protocol::Message::RegisterResponse) unless response Log.warn { "failed to register to core" } + track_error(Core::ErrorType::Connection, "Failed to register to core during handshake", severity: Core::Severity::Critical) raise "handshake failed" end @@ -242,6 +260,7 @@ module PlaceOS::Edge Log.info { "handshake success, edge registered" } rescue error Log.error(exception: error) { "during handshake" } + track_error(Core::ErrorType::Connection, "Handshake failed: #{error.message}", severity: Core::Severity::Critical) raise error end end @@ -267,6 +286,7 @@ module PlaceOS::Edge when wait_load.receive? when timeout(90.seconds) Log.error { "timeout loading #{driver_key}" } + track_error(Core::ErrorType::DriverLoad, "Timeout loading driver binary", {"driver_key" => driver_key}) end end end @@ -355,6 +375,7 @@ module PlaceOS::Edge end rescue error Log.error(exception: error) { "error during download attempt" } + track_error(Core::ErrorType::DriverLoad, "Error downloading driver binary: #{error.message}", {"driver_key" => key}) spawn { attempt_download(loaded_channel, key) } unless loaded_channel.closed? end @@ -541,5 +562,132 @@ module PlaceOS::Edge raise "cannot send request over closed transport" if t.nil? t.send_request(request) end + + # Periodic Reporting + ########################################################################### + + # Start periodic error and health reporting tasks + protected def start_periodic_reporting + # Send error reports every 5 minutes + @error_report_task = Tasker.every(5.minutes) do + send_error_report + end + + # Send health reports every 2 minutes + @health_report_task = Tasker.every(2.minutes) do + send_health_report + end + + Log.info { {edge_id: edge_id, message: "started periodic reporting tasks"} } + end + + # Stop periodic reporting tasks + protected def stop_periodic_reporting + @error_report_task.try(&.cancel) + @health_report_task.try(&.cancel) + @error_report_task = nil + @health_report_task = nil + + Log.info { {edge_id: edge_id, message: "stopped periodic reporting tasks"} } + end + + # Send error report to core + protected def send_error_report + t = transport? + return if t.nil? + + error_lock.synchronize do + # Get errors from the last 5 minutes to avoid duplicates + cutoff_time = Time.utc - 5.minutes + recent_error_list = recent_errors.select { |error| error.timestamp > cutoff_time } + + return if recent_error_list.empty? + + # Serialize errors to JSON strings + error_json_array = recent_error_list.map(&.to_json) + + request = Protocol::Message::ErrorReport.new(edge_id, error_json_array) + + begin + Protocol.request(request, expect: Protocol::Message::Success) + Log.debug { {edge_id: edge_id, error_count: error_json_array.size, message: "sent error report"} } + rescue e + Log.error(exception: e) { {edge_id: edge_id, message: "error sending error report"} } + end + end + end + + # Send health report to core + protected def send_health_report + t = transport? + return if t.nil? + + begin + health = get_edge_health + health_json = health.to_json + + request = Protocol::Message::HealthReport.new(edge_id, health_json) + + Protocol.request(request, expect: Protocol::Message::Success) + Log.debug { {edge_id: edge_id, message: "sent health report"} } + rescue e + Log.error(exception: e) { {edge_id: edge_id, message: "error sending health report"} } + end + end + + # Track an error for this edge + def track_error(type : Core::ErrorType, message : String, context = {} of String => String, severity = Core::Severity::Error) + error = Core::EdgeError.new(edge_id, type, message, context, severity) + + error_lock.synchronize do + recent_errors.push(error) + recent_errors.shift if recent_errors.size > 100 + end + + Log.warn { { + edge_id: edge_id, + error_type: type.to_s, + severity: severity.to_s, + message: message, + context: context.to_json, + } } + + # Send immediate error report for critical errors + if severity.critical? + spawn { send_immediate_error_report(error) } + end + end + + # Send immediate error report for critical errors + protected def send_immediate_error_report(error : Core::EdgeError) + t = transport? + return if t.nil? + + begin + error_json_array = [error.to_json] + request = Protocol::Message::ErrorReport.new(edge_id, error_json_array) + + Protocol.request(request, expect: Protocol::Message::Success) + Log.info { {edge_id: edge_id, error_type: error.error_type.to_s, message: "sent immediate error report"} } + rescue e + Log.error(exception: e) { {edge_id: edge_id, message: "error sending immediate error report"} } + end + end + + # Get edge health status + protected def get_edge_health : Core::EdgeHealth + module_count = modules.size + failed_modules = [] of String # TODO: Track failed modules + + Core::EdgeHealth.new( + edge_id: edge_id, + connected: !transport.closed?, + last_seen: Time.utc, + module_count: module_count, + error_count_24h: error_lock.synchronize { recent_errors.count { |e| e.timestamp > Time.utc - 24.hours } }, + connection_uptime: Time.utc - connection_start_time, + failed_modules: failed_modules + ) + end end end diff --git a/src/placeos-edge/protocol.cr b/src/placeos-edge/protocol.cr index 2a8434e1..52c7c005 100644 --- a/src/placeos-edge/protocol.cr +++ b/src/placeos-edge/protocol.cr @@ -125,6 +125,8 @@ module PlaceOS::Edge::Protocol ProxyRedis # Success FetchBinary SettingsAction # Success + ErrorReport # Success + HealthReport # Success # Response Success @@ -316,6 +318,22 @@ module PlaceOS::Edge::Protocol end end + struct ErrorReport < Client::Request + getter errors : Array(String) # JSON serialized EdgeError objects + getter edge_id : String + + def initialize(@edge_id, @errors) + end + end + + struct HealthReport < Client::Request + getter health : String # JSON serialized EdgeHealth object + getter edge_id : String + + def initialize(@edge_id, @health) + end + end + # Responses ############################################################################################### diff --git a/src/placeos-edge/server.cr b/src/placeos-edge/server.cr index d7c519ca..4c6661d6 100644 --- a/src/placeos-edge/server.cr +++ b/src/placeos-edge/server.cr @@ -5,6 +5,7 @@ require "./protocol" require "./transport" require "../placeos-core/process_manager/edge" +require "../placeos-core/edge_error" module PlaceOS::Edge class Server @@ -27,7 +28,10 @@ module PlaceOS::Edge Log.info { {edge_id: edge_id, message: "managing edge"} } socket.on_close do edges_lock.write do - edges.delete(edge_id) if edges[edge_id]? == socket + if manager = edges[edge_id]? + manager.track_connection_event(PlaceOS::Core::ConnectionEventType::Disconnected) + edges.delete(edge_id) + end end end @@ -50,9 +54,76 @@ module PlaceOS::Edge end # :ditto: - def for?(edge_id : String, & : ProcessManager::Edge) + def for?(edge_id : String, &block : PlaceOS::Core::ProcessManager::Edge ->) manager = for?(edge_id) yield manager unless manager.nil? end + + # Health Monitoring and Error Aggregation + ############################################################################################### + + # Get health status for all edges + def edge_health_status : Hash(String, PlaceOS::Core::EdgeHealth) + edges_lock.read do + edges.transform_values(&.get_edge_health) + end + end + + # Collect errors from all edges + def collect_edge_errors : Hash(String, Array(PlaceOS::Core::EdgeError)) + edges_lock.read do + edges.transform_values(&.get_recent_errors) + end + end + + # Get connection metrics for all edges + def edge_connection_metrics : Hash(String, PlaceOS::Core::ConnectionMetrics) + edges_lock.read do + edges.transform_values do |manager| + health = manager.get_edge_health + PlaceOS::Core::ConnectionMetrics.new( + edge_id: manager.edge_id, + total_connections: 1, # Simplified - would need more tracking + failed_connections: health.connected ? 0 : 1, + average_uptime: health.connection_uptime, + last_connection_attempt: health.last_seen, + last_successful_connection: health.connected ? health.last_seen : Time.utc - 1.day + ) + end + end + end + + # Get module status for all edges + def edge_module_status : Hash(String, PlaceOS::Core::EdgeModuleStatus) + edges_lock.read do + edges.transform_values(&.get_edge_module_status) + end + end + + # Get errors for a specific edge + def edge_errors(edge_id : String, limit : Int32 = 50) : Array(PlaceOS::Core::EdgeError) + for?(edge_id) { |manager| return manager.get_recent_errors(limit) } + [] of PlaceOS::Core::EdgeError + end + + # Get module initialization failures for a specific edge + def edge_module_failures(edge_id : String) : Hash(String, Array(PlaceOS::Core::ModuleInitError)) + for?(edge_id) { |manager| return manager.get_module_init_failures } + {} of String => Array(PlaceOS::Core::ModuleInitError) + end + + # Get connection status for all edges + def edge_connection_status : Hash(String, Bool) + edges_lock.read do + edges.transform_values(&.get_edge_health.connected) + end + end + + # Cleanup old errors across all edges + def cleanup_old_errors(older_than : Time::Span = 24.hours) + edges_lock.read do + edges.each_value(&.cleanup_old_errors(older_than)) + end + end end end