diff --git a/BUILD b/BUILD index 55df6284..96dd380f 100644 --- a/BUILD +++ b/BUILD @@ -12,7 +12,7 @@ cc_library( ]), hdrs = glob([ "include/**", - "framework/include/*.h", + "framework/include/*.h*", ]), includes = [ "include", diff --git a/framework/include/vx_context.h b/framework/include/vx_context.h index d6c9be4d..f54f4a80 100644 --- a/framework/include/vx_context.h +++ b/framework/include/vx_context.h @@ -18,6 +18,7 @@ #include +#include "vx_event_queue.hpp" #include "vx_internal.h" #include "vx_reference.h" @@ -207,7 +208,7 @@ class Context : public Reference /*! \brief The number of available targets in the implementation */ vx_uint32 num_targets; /*! \brief The list of implemented targets */ - vx_target targets[VX_INT_MAX_NUM_TARGETS]; + vx_target targets[VX_INT_MAX_NUM_TARGETS]; /*! \brief The list of priority sorted target indexes */ vx_uint32 priority_targets[VX_INT_MAX_NUM_TARGETS]; /*! \brief The log callback for errors */ @@ -266,6 +267,10 @@ class Context : public Reference cl_context opencl_context; cl_command_queue opencl_command_queue; #endif +#ifdef OPENVX_USE_EVENTS + /*! \brief The event queue for the context */ + EventQueue event_queue; +#endif }; #endif /* VX_CONTEXT_H */ diff --git a/framework/include/vx_event_queue.hpp b/framework/include/vx_event_queue.hpp new file mode 100644 index 00000000..ae81ff26 --- /dev/null +++ b/framework/include/vx_event_queue.hpp @@ -0,0 +1,256 @@ +/** + * @file vx_event_queue.hpp + * @brief + * @version 0.1 + * @date 2025-05-09 + * + * @copyright Copyright (c) 2025 + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include + +/** + * @brief Internal Event Queue Object + * + */ +class EventQueue +{ + // Registration structure that stores additional event parameters for a vx_reference. + struct RegistrationEntry + { + vx_reference ref; // vx_reference being registered + vx_event_type_e type; // Event type associated with this registration + vx_uint32 param; // Optional extra parameter (e.g., graph parameter index) + vx_uint32 app_value; // Application-defined value to associate with events + }; + +public: + /** + * @brief Construct a new Event Queue object + * + * @param enabled + * @param max_size + */ + explicit EventQueue(bool enabled = false, size_t max_size = 128) + : enabled_(enabled), max_size_(max_size) {} + + + /** + * @brief Clear the event queue + * + */ + void clear() + { + std::unique_lock lock(mutex_); + queue_.clear(); + } + + /** + * @brief Update the event queue status + * + * @param status true to enable, false to disable + */ + vx_status status(bool status) + { + std::lock_guard lock(mutex_); + enabled_ = status; + return VX_SUCCESS; + } + + /** + * @brief Check if the event queue is enabled + * + * @return true if enabled + * @return false if disabled + */ + bool isEnabled() const + { + return enabled_; + } + + /** + * @brief Push an event to the queue + * + * @param type Event type + * @param data Optional event value + * @param info Optional event info pointer + * @param reg Optional vx_reference used to look up additional registration data + * @return true if successful + * @return false if failed + */ + vx_status push(vx_event_type_e type, vx_uint32 data = 0, vx_event_info_t* info = nullptr, + vx_reference ref = nullptr) + { + vx_event_t evt{}; + evt.type = type; + evt.timestamp = getTimestamp(); + evt.app_value = data; + if (info != nullptr) + { + evt.event_info = *info; + } + + // If a reference is provided, look up additional registration info + if (ref != nullptr) + { + const RegistrationEntry* reg = lookupRegistration(ref, type); + if (reg && evt.type == reg->type) + { + // override the app_value with that in the registration. + evt.app_value = reg->app_value; + } + } + + return push(evt); + } + + /** + * @brief Push an event to the queue + * + * @param event Event to push + * @return true if successful + * @return false if failed + */ + vx_status push(vx_event_t event) + { + vx_status status = VX_SUCCESS; + std::unique_lock lock(mutex_); + + if (!enabled_) + { + status = VX_FAILURE; + } + + if (VX_SUCCESS == status) + { + if (queue_.size() >= max_size_) + { + queue_.pop_front(); // Drop the oldest event + } + queue_.emplace_back(std::move(event)); + cv_.notify_one(); + } + + return status; + } + + vx_status wait(vx_event_t* event, vx_bool do_not_block) + { + vx_status status = VX_SUCCESS; + std::optional evt; + std::unique_lock lock(mutex_); + + if (!enabled_) + { + status = VX_FAILURE; + } + + if (VX_SUCCESS == status) + { + if (do_not_block) + { + evt = wait_and_pop(std::chrono::milliseconds(0)); + } + else + { + evt = wait_and_pop(); + } + + if (std::nullopt == evt) + { + status = VX_FAILURE; + } + } + + if (VX_SUCCESS == status) + { + *event = std::move(*evt); + } + + return status; + } + + vx_status registerEvent(vx_reference ref, vx_event_type_e type, vx_uint32 param, + vx_uint32 app_value) + { + if (ref == nullptr) return VX_ERROR_INVALID_REFERENCE; + std::unique_lock lock(mutex_); + + RegistrationEntry entry; + entry.ref = ref; + entry.type = type; + entry.param = param; + entry.app_value = app_value; + + registrations_.push_back(entry); + return VX_SUCCESS; + } + +private : bool enabled_; + mutable std::mutex mutex_; + std::condition_variable cv_; + std::deque queue_; + size_t max_size_; + std::vector registrations_; + + /** + * @brief Lookup a registration entry matching the given vx_reference and event type. + * + * @param ref The vx_reference to look up. + * @param type The event type. + * @return const RegistrationEntry* Pointer to the matching entry, or nullptr if not found. + */ + const RegistrationEntry* lookupRegistration(vx_reference ref, vx_event_type_e type) const + { + for (const auto& entry : registrations_) + { + if (entry.ref == ref && entry.type == type) + { + return &entry; + } + } + + return nullptr; + } + + /** + * @brief Wait for an event and pop it from the queue + * + * @param timeout Timeout duration + * @return std::optional Event if available, otherwise std::nullopt + */ + std::optional wait_and_pop( + std::chrono::milliseconds timeout = std::chrono::milliseconds::max()) + { + std::unique_lock lock(mutex_); + if (!cv_.wait_for(lock, timeout, [this] { return !queue_.empty(); })) + { + return std::nullopt; // Timeout + } + + vx_event_t evt = std::move(queue_.front()); + queue_.pop_front(); + return evt; + } + + /** + * @brief Get the current timestamp in nanoseconds + * + * @return vx_uint64 Current timestamp + */ + vx_uint64 getTimestamp() const + { + auto now = std::chrono::steady_clock::now(); + return std::chrono::duration_cast(now.time_since_epoch()).count(); + } +}; diff --git a/framework/src/vx_event_queue.cpp b/framework/src/vx_event_queue.cpp index 36b66f37..0a27eb63 100755 --- a/framework/src/vx_event_queue.cpp +++ b/framework/src/vx_event_queue.cpp @@ -13,41 +13,161 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -#ifdef OPENVX_USE_PIPELINING - #include -#include #include +#include #include "vx_internal.h" +#ifdef OPENVX_USE_EVENTS + VX_API_ENTRY vx_status VX_API_CALL vxEnableEvents(vx_context context) { - return VX_ERROR_NOT_IMPLEMENTED; + vx_status status = VX_SUCCESS; + + if (vx_false_e == Context::isValidContext(context)) + { + VX_PRINT(VX_ZONE_ERROR, "context is invalid\n"); + status = VX_ERROR_INVALID_REFERENCE; + } + + if (VX_SUCCESS == status) + { + context->event_queue.status(vx_true_e); + status = VX_SUCCESS; + } + + return status; } VX_API_ENTRY vx_status VX_API_CALL vxDisableEvents(vx_context context) { - return VX_ERROR_NOT_IMPLEMENTED; + vx_status status = VX_SUCCESS; + + if (vx_false_e == Context::isValidContext(context)) + { + VX_PRINT(VX_ZONE_ERROR, "context is invalid\n"); + status = VX_ERROR_INVALID_REFERENCE; + } + + if (VX_SUCCESS == status) + { + status = context->event_queue.status(vx_false_e); + } + + return status; } VX_API_ENTRY vx_status VX_API_CALL vxSendUserEvent(vx_context context, vx_uint32 id, void *parameter) { - return VX_ERROR_NOT_IMPLEMENTED; + vx_status status = VX_SUCCESS; + + if (Context::isValidContext(context) == vx_false_e) + { + VX_PRINT(VX_ZONE_ERROR, "context is invalid\n"); + status = (vx_status)VX_ERROR_INVALID_REFERENCE; + } + + if (VX_SUCCESS == status) + { + if (!context->event_queue.isEnabled()) + { + VX_PRINT(VX_ZONE_ERROR, "events are disabled\n"); + status = VX_FAILURE; + } + } + + if (VX_SUCCESS == status) + { + vx_event_info_t event_info; + event_info.user_event.user_event_parameter = parameter; + status = context->event_queue.push(VX_EVENT_USER, id, &event_info); + } + return status; } VX_API_ENTRY vx_status VX_API_CALL vxWaitEvent( vx_context context, vx_event_t *event, vx_bool do_not_block) { - return VX_ERROR_NOT_IMPLEMENTED; + vx_status status = VX_SUCCESS; + + if (Context::isValidContext(context) == vx_false_e) + { + VX_PRINT(VX_ZONE_ERROR,"context is invalid\n"); + status = (vx_status)VX_ERROR_INVALID_REFERENCE; + } + + if (VX_SUCCESS == status) + { + if (!context->event_queue.isEnabled()) + { + VX_PRINT(VX_ZONE_ERROR, "events are disabled\n"); + status = VX_FAILURE; + } + } + + if (VX_SUCCESS == status) + { + /* Call general wait function */ + status = context->event_queue.wait(event, do_not_block); + } + + return status; } -VX_API_ENTRY vx_status VX_API_CALL vxRegisterEvent(vx_reference ref, - enum vx_event_type_e type, vx_uint32 param, vx_uint32 app_value) +VX_API_ENTRY vx_status VX_API_CALL vxRegisterEvent( + vx_reference ref, vx_event_type_e type, vx_uint32 param, vx_uint32 app_value) { - return VX_ERROR_NOT_IMPLEMENTED; + vx_status status = VX_SUCCESS; + + if (vx_false_e == Reference::isValidReference(ref)) + { + VX_PRINT(VX_ZONE_ERROR, "ref is invalid\n"); + status = (vx_status)VX_ERROR_INVALID_REFERENCE; + } + + if (VX_SUCCESS == status) + { + if (!ref->context->event_queue.isEnabled()) + { + VX_PRINT(VX_ZONE_ERROR, "events are disabled\n"); + status = VX_FAILURE; + } + } + + if (VX_SUCCESS == status) + { + if (ref->type != VX_TYPE_GRAPH && + ref->type != VX_TYPE_NODE && + ref->type != VX_TYPE_PARAMETER) + { + VX_PRINT(VX_ZONE_ERROR, "ref is not a graph or node\n"); + status = VX_ERROR_INVALID_PARAMETERS; + } + } + + if (VX_SUCCESS == status) + { + // Only some event types are allowed per spec + switch (type) + { + case VX_EVENT_GRAPH_COMPLETED: + case VX_EVENT_NODE_COMPLETED: + case VX_EVENT_NODE_ERROR: + case VX_EVENT_USER: + break; + default: + status = VX_ERROR_INVALID_PARAMETERS; + } + } + + if (VX_SUCCESS == status) + { + status = ref->context->event_queue.registerEvent(ref, type, param, app_value); + } + + return status; } -#endif /* OPENVX_USE PIPELINING */ +#endif /* OPENVX_USE EVENTS */ diff --git a/framework/src/vx_graph.cpp b/framework/src/vx_graph.cpp index 355ad720..eef05f3d 100644 --- a/framework/src/vx_graph.cpp +++ b/framework/src/vx_graph.cpp @@ -2509,8 +2509,66 @@ static vx_status vxExecuteGraph(vx_graph graph, vx_uint32 depth) } } + if (action == VX_ACTION_CONTINUE) + { + if (graph->context->event_queue.isEnabled()) + { + // Raise a node completed event. + vx_event_info_t event_info; + event_info.node_completed.graph = graph; + event_info.node_completed.node = node; + if (VX_SUCCESS != + graph->context->event_queue.push(VX_EVENT_NODE_COMPLETED, 0, + &event_info, (vx_reference)node)) + { + VX_PRINT(VX_ZONE_ERROR, + "Failed to push node completed event for node %s\n", + node->kernel->name); + } + + for (vx_uint32 gp = 0; gp < graph->numParams; gp++) + { + vx_node param_node = graph->parameters[gp].node; + vx_uint32 param_index = graph->parameters[gp].index; + + // If this node just executed and consumed a graph parameter + if (param_node == node) + { + vx_event_info_t event_info = {}; + event_info.graph_parameter_consumed.graph = graph; + event_info.graph_parameter_consumed.graph_parameter_index = + param_index; + + if (VX_SUCCESS != graph->context->event_queue.push( + VX_EVENT_GRAPH_PARAMETER_CONSUMED, 0, + &event_info, (vx_reference)graph)) + { + VX_PRINT( + VX_ZONE_ERROR, + "Failed to push graph parameter consumed event for " + "graph %p, param %u\n", + graph, gp); + } + } + } + } + } + if (action == VX_ACTION_ABANDON) { + // Raise a node error event. + vx_event_info_t event_info; + event_info.node_error.graph = graph; + event_info.node_error.node = node; + event_info.node_error.status = node->status; + if (graph->context->event_queue.isEnabled() && + VX_SUCCESS != graph->context->event_queue.push(VX_EVENT_NODE_ERROR, 0, + &event_info, + (vx_reference)node)) + { + VX_PRINT(VX_ZONE_ERROR, "Failed to push node error event for node %s\n", + node->kernel->name); + } break; } } @@ -2581,6 +2639,7 @@ static vx_status vxExecuteGraph(vx_graph graph, vx_uint32 depth) } VX_PRINT(VX_ZONE_GRAPH,"Process returned status %d\n", status); + // Report the performance of the graph execution. if (context->perf_enabled) { for (n = 0; n < graph->numNodes; n++) @@ -2598,9 +2657,22 @@ static vx_status vxExecuteGraph(vx_graph graph, vx_uint32 depth) } if (status == VX_SUCCESS) + { graph->state = VX_GRAPH_STATE_COMPLETED; + // Raise a graph completed event. + vx_event_info_t event_info; + event_info.graph_completed.graph = graph; + if (graph->context->event_queue.isEnabled() && + VX_SUCCESS != graph->context->event_queue.push(VX_EVENT_GRAPH_COMPLETED, 0, &event_info, + (vx_reference)graph)) + { + VX_PRINT(VX_ZONE_ERROR, "Failed to push graph completed event for graph %p\n", graph); + } + } else + { graph->state = VX_GRAPH_STATE_ABANDONED; + } return status; } diff --git a/include/VX/vx_corevx_ext.h b/include/VX/vx_corevx_ext.h index a0b6e221..0c4ffe6c 100644 --- a/include/VX/vx_corevx_ext.h +++ b/include/VX/vx_corevx_ext.h @@ -119,6 +119,7 @@ VX_API_ENTRY vx_status VX_API_CALL vxImportGraphFromDot(vx_graph graph, vx_char #define OPENVX_USE_OPENCL_INTEROP #define OPENVX_USE_NN #define OPENVX_USE_NN_16 +#define OPENVX_USE_EVENTS #if defined(__arm__) || defined(__arm64__) #define OPENVX_USE_TILING