From e7d9054deed077e0c007faecd0cd7d950d90a237 Mon Sep 17 00:00:00 2001 From: "M. Samil Atesoglu" Date: Tue, 6 Jan 2026 19:11:52 +0300 Subject: [PATCH 1/4] WIP: Add frame rate converter cross-thread node --- .../Nodes/FrameRateConverter.nosnode | 67 +++++ .../nosReflect/Source/BoundedQueueNodes.cpp | 13 +- .../nosReflect/Source/FrameRateConverter.cpp | 263 ++++++++++++++++++ Plugins/nosReflect/Source/PluginMain.cpp | 3 + Plugins/nosReflect/Source/RingBuffer.hpp | 102 ++++--- Plugins/nosReflect/Source/RingBufferNodes.cpp | 13 +- Plugins/nosReflect/Types/Reflect.fbs | 6 + 7 files changed, 418 insertions(+), 49 deletions(-) create mode 100644 Plugins/nosReflect/Nodes/FrameRateConverter.nosnode create mode 100644 Plugins/nosReflect/Source/FrameRateConverter.cpp diff --git a/Plugins/nosReflect/Nodes/FrameRateConverter.nosnode b/Plugins/nosReflect/Nodes/FrameRateConverter.nosnode new file mode 100644 index 00000000..4fb37eaf --- /dev/null +++ b/Plugins/nosReflect/Nodes/FrameRateConverter.nosnode @@ -0,0 +1,67 @@ +{ + "nodes": [ + { + "class_name": "FrameRateConverter", + "menu_info": { + "category": "Type", + "display_name": "Frame Rate Converter", + "name_aliases": [ + "circular buffer", + "circular queue", + "ring buffer" + ] + }, + "node": { + "name": "FrameRateConverter", + "display_name": "Frame Rate Converter", + "description": "Features a ring buffer that outputs data at a different rate than input rate.", + "contents_type": "Job", + "pins": [ + { + "name": "Thread", + "type_name": "nos.exe", + "show_as": "INPUT_PIN", + "can_show_as": "INPUT_PIN_ONLY" + }, + { + "name": "Input", + "type_name": "nos.Generic", + "show_as": "INPUT_PIN", + "can_show_as": "INPUT_PIN_ONLY" + }, + { + "name": "Capacity", + "type_name": "uint", + "show_as": "PROPERTY", + "can_show_as": "INPUT_PIN_OR_PROPERTY", + "description": "Number of frames to buffer. Will be larger if Ratio requires it.", + "data": 2, + "min": 1 + }, + { + "name": "Output", + "type_name": "nos.Generic", + "show_as": "OUTPUT_PIN", + "can_show_as": "OUTPUT_PIN_ONLY", + "live": true + }, + { + "name": "Ratio", + "type_name": "nos.fb.vec2u", + "show_as": "PROPERTY", + "can_show_as": "INPUT_PIN_OR_PROPERTY", + "data": { + "x": 1, + "y": 1 + }, + "min": { + "x": 1, + "y": 1 + } + } + ] + } + } + ], + "schema_version": "1.4-v1" +} diff --git a/Plugins/nosReflect/Source/BoundedQueueNodes.cpp b/Plugins/nosReflect/Source/BoundedQueueNodes.cpp index e1e63937..8ed589ef 100644 --- a/Plugins/nosReflect/Source/BoundedQueueNodes.cpp +++ b/Plugins/nosReflect/Source/BoundedQueueNodes.cpp @@ -5,8 +5,17 @@ namespace nos::reflect { -using CopyingBoundedQueueNode = RingBufferNodeBase; -using BoundedObjectQueueNode = RingBufferNodeBase; +struct CopyingBoundedQueueNode : RingBufferNodeBase +{ + CopyingBoundedQueueNode() + : RingBufferNodeBase(RingBufferServeMode::ServeImmediately) {} +}; + +struct BoundedObjectQueueNode : RingBufferNodeBase +{ + BoundedObjectQueueNode() + : RingBufferNodeBase(RingBufferServeMode::ServeImmediately) {} +}; nosResult RegisterCopyingBoundedQueue(nosNodeFunctions* funcs) { diff --git a/Plugins/nosReflect/Source/FrameRateConverter.cpp b/Plugins/nosReflect/Source/FrameRateConverter.cpp new file mode 100644 index 00000000..040e6c18 --- /dev/null +++ b/Plugins/nosReflect/Source/FrameRateConverter.cpp @@ -0,0 +1,263 @@ +#include "RingBuffer.hpp" + +namespace nos::reflect +{ + +struct FrameRateConverterNode : NodeContext +{ + nos::Name TypeName = NSN_TypeNameGeneric; + + RingBuffer> Ring; + uint32_t Capacity = 1; + uint32_t RemainingRepeatCount = 0; + fb::vec2u Ratio = {1, 1}; + + bool CapacityUpdatedViaPathCommand = false; + + FrameRateConverterNode() : Ring(1, RingBufferServeMode::WaitUntilFull) + { + Ring.Reset(Capacity); + AddPinValueWatcher(NOS_NAME("Capacity"), [this](const uint32_t* newCapacity, std::optional oldCapacity) + { + if (*newCapacity != Capacity) + { + Capacity = std::max(1u, *newCapacity); + if (*newCapacity != Capacity) + { + nosEngine.LogW("%s: Capacity cannot be %lu.", GetItemPath(NodeId).value_or("").c_str(), *newCapacity); + SetPinValue(NOS_NAME("Capacity"), Capacity); + return; + } + if (!CapacityUpdatedViaPathCommand) + { + nosPathCommand ringSizeChange{.Event = NOS_RING_SIZE_CHANGE, .RingSize = Capacity}; + nosEngine.SendPathCommand(*GetPinId(NSN_Input), ringSizeChange); + CapacityUpdatedViaPathCommand = false; + } + SendPathRestart(NSN_Input); + } + }); + AddPinValueWatcher(NOS_NAME("Ratio"), [this](const fb::vec2u* newRatio, std::optional oldRatio) + { + if (*newRatio != Ratio) + { + if (newRatio->x() == 0 || newRatio->y() == 0) + { + nosEngine.LogW("%s: Ratio components cannot be 0.", GetItemPath(NodeId).value_or("").c_str()); + SetPinValue(NOS_NAME("Ratio"), Ratio); + return; + } + Ratio = *newRatio; + SendPathRestart(NodeId); + } + }); + } + + void SendRingStats(const char* state) const + { + auto nodeName = NodeName.AsString(); + nosEngine.WatchLog((nodeName + " Size").c_str(), std::to_string(Ring.GetSize()).c_str()); + nosEngine.WatchLog((nodeName + " Capacity").c_str(), std::to_string(Ring.GetCapacity()).c_str()); + nosEngine.WatchLog((nodeName + " State").c_str(), state); + } + + void OnPathStart() override + { + Ring.Reset(Capacity); + RemainingRepeatCount = Capacity - 1; + SendScheduleRequest(Capacity); + } + + void OnPathStop() override + { + Ring.Shutdown(); + } + + nosResult OnCreate(nosFbNodePtr node) override + { + for (auto pin : *node->pins()) + { + auto name = nos::Name(pin->name()->c_str()); + if (NSN_Output == name) + { + if (pin->type_name()->c_str() == NSN_TypeNameGeneric.AsString()) + continue; + SetType(nos::Name(pin->type_name()->c_str())); + } + } + return NOS_RESULT_SUCCESS; + } + + nosResult CopyFrom(nosCopyFromInfo* cpy) override + { + SendRingStats("Pre Begin Pop"); + if (Ring.GetMode() == RingBufferServeMode::WaitUntilFull) + { + if (RemainingRepeatCount > 0) + { + --RemainingRepeatCount; + return NOS_RESULT_SUCCESS; + } + } + uint32_t popCount = Ratio.y(); + std::vector outputObjectRefs; + uint64_t frameNumber = 0; + while (popCount > 0) + { + std::unique_ptr* srcSlot; + { + ScopedProfilerEvent _({ .Name = "Wait For Read" }); + srcSlot = Ring.BeginPop(100); + } + if (srcSlot && *srcSlot) + { + auto& slot = *srcSlot; + frameNumber = slot->FrameNumber; + outputObjectRefs.push_back(std::move(slot->Object)); + Ring.EndPop(); + SendRingStats("Post Begin Pop"); + } + else if (Ring.IsShuttingDown()) + { + return NOS_RESULT_FAILED; + } + else + { + // Timeout + if (outputObjectRefs.empty()) + return NOS_RESULT_PENDING; + break; + } + --popCount; + } + if (!outputObjectRefs.empty()) + { + // Convert ObjectRefs to IDs for the API call + std::vector outputObjects; + outputObjects.reserve(outputObjectRefs.size()); + for (const auto& ref : outputObjectRefs) + outputObjects.push_back(ref.GetObjectId()); + + ObjectRef outputArrayObject; + auto res = nosEngine.ObjectAPI->CreateArrayObject( + TypeName, outputObjects.data(), outputObjects.size(), &outputArrayObject.GetStorage()); + if (res != NOS_RESULT_SUCCESS) + return res; + SetPinObject(NSN_Output, outputArrayObject); + SendScheduleRequest(1); + return NOS_RESULT_SUCCESS; + } + return NOS_RESULT_PENDING; + } + + void OnPathCommand(const nosPathCommand* command) override + { + switch (command->Event) + { + case NOS_RING_SIZE_CHANGE: { + if (command->RingSize == 0) + { + nosEngine.LogW((GetDisplayName() + " capacity cannot be 0.").c_str()); + return; + } + CapacityUpdatedViaPathCommand = true; + SetPinValue(NOS_NAME("Capacity"), command->RingSize); + break; + } + default: + return; + } + } + + nosResult OnResolvePinDataTypes(nosResolvePinDataTypesParams* params) override + { + TypeInfo typeInfo(params->IncomingTypeName); + if (typeInfo->BaseType != NOS_BASE_TYPE_ARRAY) + { + strncpy(params->OutErrorMessage, "Connected pin type must be an array type!", 42); + return NOS_RESULT_FAILED; + } + return NOS_RESULT_SUCCESS; + } + + void OnPinUpdated(nosPinUpdate const* update) override + { + if (TypeName != NSN_TypeNameGeneric) + return; + if (update->UpdatedField == NOS_PIN_FIELD_TYPE_NAME) + { + if (update->PinName != NSN_Input) + return; + SetType(update->TypeName); + } + } + + void SetType(nos::Name typeName) + { + TypeName = typeName; + SendPathRestart(NodeId); + } + + nosResult ExecuteNode(NodeExecuteParams const& params) override + { + if (NSN_TypeNameGeneric == TypeName) + return NOS_RESULT_FAILED; + ArrayObjectRef inputArrayObject = params.GetPinObject(NSN_Input); + if (!inputArrayObject) + return NOS_RESULT_FAILURE; + auto capacity = *InterpretObject(*params[NOS_NAME("Capacity")].Object); + capacity = std::max(1u, capacity); + + SendRingStats("Pre Push"); + uint32_t pushCount = Ratio.x(); + for (auto inputObject : inputArrayObject) + { + std::unique_ptr* dstSlot; + { + ScopedProfilerEvent _({ .Name = "Wait For Empty Slot" }); + dstSlot = Ring.BeginPush(100); + } + if (dstSlot) + { + if (!*dstSlot) + *dstSlot = std::make_unique(inputArrayObject); + auto& slot = *dstSlot; + if (!slot->IsDestinationCompatibleWith(inputArrayObject)) + { + SendPathRestart(NSN_Input); + return NOS_RESULT_FAILURE; + } + slot->FrameNumber = params.FrameNumber; + auto res = slot->CopyFrom(inputObject); + Ring.EndPush(); + SendRingStats("Post Push"); + if (res != NOS_RESULT_SUCCESS) + return res; + } + else if (Ring.IsShuttingDown()) + { + return NOS_RESULT_FAILED; + } + else + { + // Timeout + return NOS_RESULT_PENDING; + } + if (pushCount == 0) + nosEngine.LogW("%s: Item count in array exceeded input ratio!", GetItemPath(NodeId).value_or("").c_str()); + if (pushCount != 0) + --pushCount; + } + if (pushCount > 0) + nosEngine.LogW("%s: Fewer items in input array than specified in ratio!", GetItemPath(NodeId).value_or("").c_str()); + return NOS_RESULT_SUCCESS; + } +}; + +nosResult RegisterFrameRateConverter(nosNodeFunctions* node) +{ + NOS_BIND_NODE_CLASS(NOS_NAME("FrameRateConverter"), FrameRateConverterNode, node) + return NOS_RESULT_SUCCESS; +} + +} \ No newline at end of file diff --git a/Plugins/nosReflect/Source/PluginMain.cpp b/Plugins/nosReflect/Source/PluginMain.cpp index a9a5ecde..6fef5ca4 100644 --- a/Plugins/nosReflect/Source/PluginMain.cpp +++ b/Plugins/nosReflect/Source/PluginMain.cpp @@ -39,6 +39,7 @@ enum Nodes : size_t CopyingBoundedQueue, ObjectRingBuffer, BoundedObjectQueue, + FrameRateConverter, Count }; @@ -63,6 +64,7 @@ nosResult RegisterCopyingRingBuffer(nosNodeFunctions* node); nosResult RegisterCopyingBoundedQueue(nosNodeFunctions* node); nosResult RegisterObjectRingBuffer(nosNodeFunctions* node); nosResult RegisterBoundedObjectQueue(nosNodeFunctions* node); +nosResult RegisterFrameRateConverter(nosNodeFunctions* node); nosResult NOSAPI_CALL ExportNodeFunctions(size_t* outCount, nosNodeFunctions** outFunctions) { @@ -104,6 +106,7 @@ nosResult NOSAPI_CALL ExportNodeFunctions(size_t* outCount, nosNodeFunctions** o GEN_CASE_NODE(CopyingBoundedQueue) GEN_CASE_NODE(ObjectRingBuffer) GEN_CASE_NODE(BoundedObjectQueue) + GEN_CASE_NODE(FrameRateConverter) } } diff --git a/Plugins/nosReflect/Source/RingBuffer.hpp b/Plugins/nosReflect/Source/RingBuffer.hpp index a34ef709..34e12781 100644 --- a/Plugins/nosReflect/Source/RingBuffer.hpp +++ b/Plugins/nosReflect/Source/RingBuffer.hpp @@ -7,27 +7,23 @@ #include #include "Names.h" +#include "nosReflect/Reflect_generated.h" namespace nos::reflect { -enum class ServeMode -{ - WaitUntilFull, // Ring Buffer - Immediate // Bounded Queue -}; - -template +template class RingBuffer { public: - explicit RingBuffer(size_t capacity) + explicit RingBuffer(size_t capacity, RingBufferServeMode mode = RingBufferServeMode::WaitUntilFull) : Capacity(capacity), - Buffer(capacity), + Buffer(), Head(0), Tail(0), Size(0), ExitRequested(false) { + Reset(capacity, mode); } RingBuffer(const RingBuffer&) = delete; @@ -53,9 +49,9 @@ class RingBuffer std::unique_lock lock(Mutex); Head = (Head + 1) % Capacity; ++Size; - if (!WaitUntilFull || Size == Capacity) + if (State == RingState::Filling && Size == Capacity) { - WaitUntilFull = false; + State = RingState::Serving; ReadyForPopCV.notify_one(); } } @@ -67,7 +63,7 @@ class RingBuffer [this]() -> bool { if (ExitRequested) return true; - if (WaitUntilFull) + if (State == RingState::Filling) return Size == Capacity; return Size > 0; })) @@ -124,7 +120,7 @@ class RingBuffer return Capacity; } - void Reset(std::optional newCapacity = std::nullopt) + void Reset(std::optional newCapacity = std::nullopt, std::optional newMode = std::nullopt) { std::unique_lock lock(Mutex); Head = 0; @@ -134,13 +130,16 @@ class RingBuffer if (newCapacity && *newCapacity != Capacity) Capacity = *newCapacity; Buffer.resize(Capacity); - if constexpr (Mode == ServeMode::WaitUntilFull) + if (newMode) + Mode = *newMode; + switch (Mode) { - WaitUntilFull = true; - } - else if constexpr (Mode == ServeMode::Immediate) - { - WaitUntilFull = false; + case RingBufferServeMode::ServeImmediately: + State = RingState::Serving; + break; + case RingBufferServeMode::WaitUntilFull: + State = RingState::Filling; + break; } lock.unlock(); ExitRequested.store(false); @@ -148,6 +147,11 @@ class RingBuffer ReadyForPushCV.notify_all(); } + RingBufferServeMode GetMode() const + { + return Mode; + } + private: size_t Capacity; std::vector Buffer; @@ -155,11 +159,18 @@ class RingBuffer size_t Tail; size_t Size; + enum class RingState + { + Filling, + Serving + }; + mutable std::mutex Mutex; std::condition_variable ReadyForPopCV; std::condition_variable ReadyForPushCV; std::atomic_bool ExitRequested; - std::atomic_bool WaitUntilFull; + RingState State = RingState::Filling; + RingBufferServeMode Mode = RingBufferServeMode::WaitUntilFull; }; struct CopyingSlot : transfer::Slot @@ -192,19 +203,39 @@ struct ObjectSlot }; -template +template struct RingBufferNodeBase : NodeContext { nosName TypeName = NSN_TypeNameGeneric; - RingBuffer, Mode> Ring; + RingBuffer> Ring; uint32_t Capacity = 1; uint32_t RemainingRepeatCount = 0; - + bool CapacityUpdatedViaPathCommand = false; - - RingBufferNodeBase() : Ring(1) + + RingBufferNodeBase(RingBufferServeMode mode) : Ring(1, mode) { + AddPinValueWatcher(NOS_NAME("Capacity"), [this](const uint32_t* newCapacity, std::optional oldCapacity) + { + if (*newCapacity != Capacity) + { + Capacity = std::max(1u, *newCapacity); + if (*newCapacity != Capacity) + { + nosEngine.LogW("%s: Capacity cannot be %lu.", GetItemPath(NodeId).value_or("").c_str(), *newCapacity); + SetPinValue(NOS_NAME("Capacity"), Capacity); + return; + } + if (!CapacityUpdatedViaPathCommand) + { + nosPathCommand ringSizeChange{.Event = NOS_RING_SIZE_CHANGE, .RingSize = Capacity}; + nosEngine.SendPathCommand(*GetPinId(NSN_Input), ringSizeChange); + CapacityUpdatedViaPathCommand = false; + } + SendPathRestart(NSN_Input); + } + }); } void SendRingStats(std::string_view state) const @@ -245,7 +276,7 @@ struct RingBufferNodeBase : NodeContext nosResult CopyFrom(nosCopyFromInfo* cpy) override { SendRingStats("Pre Begin Pop"); - if constexpr (Mode == ServeMode::WaitUntilFull) + if (Ring.GetMode() == RingBufferServeMode::WaitUntilFull) { if (RemainingRepeatCount > 0) { @@ -293,25 +324,6 @@ struct RingBufferNodeBase : NodeContext } } - void OnPinObjectChanged(nos::Name pinName, uuid const& pinId, nosObjectId handle) override - { - if (NOS_NAME("Capacity") == pinName) - { - auto newCapacity = *InterpretObject(handle); - if (newCapacity != Capacity) - { - Capacity = std::max(1u, newCapacity); - if (!CapacityUpdatedViaPathCommand) - { - nosPathCommand ringSizeChange{.Event = NOS_RING_SIZE_CHANGE, .RingSize = Capacity}; - nosEngine.SendPathCommand(*GetPinId(NSN_Input), ringSizeChange); - CapacityUpdatedViaPathCommand = false; - } - SendPathRestart(NSN_Input); - } - } - } - void OnPinUpdated(nosPinUpdate const* update) override { if (TypeName != NSN_TypeNameGeneric) diff --git a/Plugins/nosReflect/Source/RingBufferNodes.cpp b/Plugins/nosReflect/Source/RingBufferNodes.cpp index eac351fd..3549d510 100644 --- a/Plugins/nosReflect/Source/RingBufferNodes.cpp +++ b/Plugins/nosReflect/Source/RingBufferNodes.cpp @@ -12,8 +12,17 @@ namespace nos::reflect { -using CopyingRingBufferNode = RingBufferNodeBase; -using ObjectRingBufferNode = RingBufferNodeBase; +struct ObjectRingBufferNode : RingBufferNodeBase +{ + ObjectRingBufferNode() + : RingBufferNodeBase(RingBufferServeMode::WaitUntilFull) {} +}; + +struct CopyingRingBufferNode : RingBufferNodeBase +{ + CopyingRingBufferNode() + : RingBufferNodeBase(RingBufferServeMode::WaitUntilFull) {} +}; nosResult RegisterCopyingRingBuffer(nosNodeFunctions* funcs) { diff --git a/Plugins/nosReflect/Types/Reflect.fbs b/Plugins/nosReflect/Types/Reflect.fbs index 837da4f6..7a03153b 100644 --- a/Plugins/nosReflect/Types/Reflect.fbs +++ b/Plugins/nosReflect/Types/Reflect.fbs @@ -8,4 +8,10 @@ enum BinaryOperator : uint DIV, EXP, LOG, +} + +enum RingBufferServeMode : uint +{ + WaitUntilFull, + ServeImmediately, } \ No newline at end of file From b509551c4404c801102bb02637e2b1fa0d8f1ddc Mon Sep 17 00:00:00 2001 From: "M. Samil Atesoglu" Date: Wed, 7 Jan 2026 16:49:36 +0300 Subject: [PATCH 2/4] FrameRateConverter: Improve robustness and add status messages --- .../nosReflect/Source/FrameRateConverter.cpp | 287 ++++++++++-------- Plugins/nosReflect/Source/RingBuffer.hpp | 65 +++- 2 files changed, 229 insertions(+), 123 deletions(-) diff --git a/Plugins/nosReflect/Source/FrameRateConverter.cpp b/Plugins/nosReflect/Source/FrameRateConverter.cpp index 040e6c18..4f659eb7 100644 --- a/Plugins/nosReflect/Source/FrameRateConverter.cpp +++ b/Plugins/nosReflect/Source/FrameRateConverter.cpp @@ -5,52 +5,78 @@ namespace nos::reflect struct FrameRateConverterNode : NodeContext { + struct Slot + { + ObjectRef Object; + uint64_t FrameNumber = 0; + }; + nos::Name TypeName = NSN_TypeNameGeneric; - RingBuffer> Ring; + RingBuffer Ring; uint32_t Capacity = 1; + uint32_t EffectiveCapacity = 1; uint32_t RemainingRepeatCount = 0; fb::vec2u Ratio = {1, 1}; + enum class StatusType + { + Ratio, + Capacity, + }; + + std::unordered_map StatusMessages; + bool CapacityUpdatedViaPathCommand = false; - + FrameRateConverterNode() : Ring(1, RingBufferServeMode::WaitUntilFull) { - Ring.Reset(Capacity); - AddPinValueWatcher(NOS_NAME("Capacity"), [this](const uint32_t* newCapacity, std::optional oldCapacity) + Ring.Reset(EffectiveCapacity); + AddPinValueWatcher(NOS_NAME("Capacity"), std::bind(&FrameRateConverterNode::OnCapacityPinValueChanged, this, std::placeholders::_1, std::placeholders::_2)); + AddPinValueWatcher(NOS_NAME("Ratio"), std::bind(&FrameRateConverterNode::OnRatioPinValueChanged, this, std::placeholders::_1, std::placeholders::_2)); + } + + void OnRatioPinValueChanged(fb::vec2u const* newRatio, std::optional oldRatio) + { + if (*newRatio == Ratio) + return; + if (newRatio->x() == 0 || newRatio->y() == 0) { - if (*newCapacity != Capacity) - { - Capacity = std::max(1u, *newCapacity); - if (*newCapacity != Capacity) - { - nosEngine.LogW("%s: Capacity cannot be %lu.", GetItemPath(NodeId).value_or("").c_str(), *newCapacity); - SetPinValue(NOS_NAME("Capacity"), Capacity); - return; - } - if (!CapacityUpdatedViaPathCommand) - { - nosPathCommand ringSizeChange{.Event = NOS_RING_SIZE_CHANGE, .RingSize = Capacity}; - nosEngine.SendPathCommand(*GetPinId(NSN_Input), ringSizeChange); - CapacityUpdatedViaPathCommand = false; - } - SendPathRestart(NSN_Input); - } - }); - AddPinValueWatcher(NOS_NAME("Ratio"), [this](const fb::vec2u* newRatio, std::optional oldRatio) + nosEngine.LogW("%s: Ratio components cannot be 0.", GetItemPath(NodeId).value_or("").c_str()); + SetPinValue(NOS_NAME("Ratio"), Ratio); + return; + } + Ratio = *newRatio; + auto lcm = std::lcm(Ratio.x(), Ratio.y()); + auto requiredEffectiveCapacity = lcm * Capacity; + if (requiredEffectiveCapacity != EffectiveCapacity) { - if (*newRatio != Ratio) - { - if (newRatio->x() == 0 || newRatio->y() == 0) - { - nosEngine.LogW("%s: Ratio components cannot be 0.", GetItemPath(NodeId).value_or("").c_str()); - SetPinValue(NOS_NAME("Ratio"), Ratio); - return; - } - Ratio = *newRatio; - SendPathRestart(NodeId); - } - }); + EffectiveCapacity = requiredEffectiveCapacity; + } + SendPathRestart(NodeId); + } + + void OnCapacityPinValueChanged(uint32_t const* newCapacity, std::optional oldCapacity) + { + if (*newCapacity == Capacity) + return; + Capacity = std::max(1u, *newCapacity); + EffectiveCapacity = std::lcm(Ratio.x(), Ratio.y()) * Capacity; + if (*newCapacity != Capacity) + { + nosEngine.LogW("%s: Capacity cannot be %u.", + GetItemPath(NodeId).value_or("").c_str(), + *newCapacity); + SetPinValue(NOS_NAME("Capacity"), Capacity); + return; + } + if (!CapacityUpdatedViaPathCommand) + { + nosPathCommand ringSizeChange{.Event = NOS_RING_SIZE_CHANGE, .RingSize = Capacity}; + nosEngine.SendPathCommand(*GetPinId(NSN_Input), ringSizeChange); + CapacityUpdatedViaPathCommand = false; + } + SendPathRestart(NSN_Input); } void SendRingStats(const char* state) const @@ -63,15 +89,16 @@ struct FrameRateConverterNode : NodeContext void OnPathStart() override { - Ring.Reset(Capacity); + nosEngine.LogD("%s: Effective Capacity set to %u", GetItemPath(NodeId).value_or("").c_str(), EffectiveCapacity); + SetStatus(StatusType::Capacity, + "Capacity: " + std::to_string(Capacity) + " (Effective: " + std::to_string(EffectiveCapacity) + ")", + fb::NodeStatusMessageType::INFO); + Ring.Reset(EffectiveCapacity); RemainingRepeatCount = Capacity - 1; SendScheduleRequest(Capacity); } - void OnPathStop() override - { - Ring.Shutdown(); - } + void OnPathStop() override { Ring.Shutdown(); } nosResult OnCreate(nosFbNodePtr node) override { @@ -99,22 +126,21 @@ struct FrameRateConverterNode : NodeContext return NOS_RESULT_SUCCESS; } } - uint32_t popCount = Ratio.y(); - std::vector outputObjectRefs; - uint64_t frameNumber = 0; - while (popCount > 0) - { - std::unique_ptr* srcSlot; + std::vector outputObjectRefs; + uint64_t frameNumber = 0; + uint32_t popCount = Ratio.y(); + std::optional> maybeSrcSlots; { - ScopedProfilerEvent _({ .Name = "Wait For Read" }); - srcSlot = Ring.BeginPop(100); + ScopedProfilerEvent _({.Name = "Wait For Read"}); + maybeSrcSlots = Ring.BeginPopMultiple(popCount, 100); } - if (srcSlot && *srcSlot) + if (maybeSrcSlots) { - auto& slot = *srcSlot; - frameNumber = slot->FrameNumber; - outputObjectRefs.push_back(std::move(slot->Object)); - Ring.EndPop(); + auto& srcSlots = *maybeSrcSlots; + for (auto& srcSlot : srcSlots) + outputObjectRefs.push_back(std::move(srcSlot->Object)); + frameNumber = srcSlots[0]->FrameNumber; + Ring.EndPopMultiple(popCount); SendRingStats("Post Begin Pop"); } else if (Ring.IsShuttingDown()) @@ -124,29 +150,27 @@ struct FrameRateConverterNode : NodeContext else { // Timeout - if (outputObjectRefs.empty()) - return NOS_RESULT_PENDING; - break; + return NOS_RESULT_PENDING; + } + if (!outputObjectRefs.empty()) + { + // Convert ObjectRefs to IDs for the API call + std::vector outputObjects; + outputObjects.reserve(outputObjectRefs.size()); + for (const auto& ref : outputObjectRefs) + outputObjects.push_back(ref.GetObjectId()); + + ObjectRef outputArrayObject; + auto res = nosEngine.ObjectAPI->CreateArrayObject( + TypeName, outputObjects.data(), outputObjects.size(), &outputArrayObject.GetStorage()); + if (res != NOS_RESULT_SUCCESS) + return res; + SetPinObject(NSN_Output, outputArrayObject); + cpy->ShouldSetSourceFrameNumber = true; + cpy->FrameNumber = frameNumber; + SendScheduleRequest(1); + return NOS_RESULT_SUCCESS; } - --popCount; - } - if (!outputObjectRefs.empty()) - { - // Convert ObjectRefs to IDs for the API call - std::vector outputObjects; - outputObjects.reserve(outputObjectRefs.size()); - for (const auto& ref : outputObjectRefs) - outputObjects.push_back(ref.GetObjectId()); - - ObjectRef outputArrayObject; - auto res = nosEngine.ObjectAPI->CreateArrayObject( - TypeName, outputObjects.data(), outputObjects.size(), &outputArrayObject.GetStorage()); - if (res != NOS_RESULT_SUCCESS) - return res; - SetPinObject(NSN_Output, outputArrayObject); - SendScheduleRequest(1); - return NOS_RESULT_SUCCESS; - } return NOS_RESULT_PENDING; } @@ -155,17 +179,16 @@ struct FrameRateConverterNode : NodeContext switch (command->Event) { case NOS_RING_SIZE_CHANGE: { - if (command->RingSize == 0) - { - nosEngine.LogW((GetDisplayName() + " capacity cannot be 0.").c_str()); - return; - } - CapacityUpdatedViaPathCommand = true; - SetPinValue(NOS_NAME("Capacity"), command->RingSize); - break; + if (command->RingSize == 0) + { + nosEngine.LogW((GetDisplayName() + " capacity cannot be 0.").c_str()); + return; + } + CapacityUpdatedViaPathCommand = true; + SetPinValue(NOS_NAME("Capacity"), command->RingSize); + break; } - default: - return; + default: return; } } @@ -205,52 +228,74 @@ struct FrameRateConverterNode : NodeContext ArrayObjectRef inputArrayObject = params.GetPinObject(NSN_Input); if (!inputArrayObject) return NOS_RESULT_FAILURE; - auto capacity = *InterpretObject(*params[NOS_NAME("Capacity")].Object); - capacity = std::max(1u, capacity); SendRingStats("Pre Push"); uint32_t pushCount = Ratio.x(); - for (auto inputObject : inputArrayObject) + std::optional> maybeDstSlots; { - std::unique_ptr* dstSlot; - { - ScopedProfilerEvent _({ .Name = "Wait For Empty Slot" }); - dstSlot = Ring.BeginPush(100); - } - if (dstSlot) + ScopedProfilerEvent _({.Name = "Wait For Empty Slot"}); + maybeDstSlots = Ring.BeginPushMultiple(pushCount, 100); + } + if (maybeDstSlots) + { + auto& dstSlots = *maybeDstSlots; + auto inSize = inputArrayObject.GetSize(); + if (inSize != pushCount) { - if (!*dstSlot) - *dstSlot = std::make_unique(inputArrayObject); - auto& slot = *dstSlot; - if (!slot->IsDestinationCompatibleWith(inputArrayObject)) - { - SendPathRestart(NSN_Input); - return NOS_RESULT_FAILURE; - } - slot->FrameNumber = params.FrameNumber; - auto res = slot->CopyFrom(inputObject); - Ring.EndPush(); + SetStatus(StatusType::Ratio, + "Input array size (" + std::to_string(inSize) + ") does not match required input size (" + + std::to_string(pushCount) + ")!", + fb::NodeStatusMessageType::FAILURE); + Ring.EndPushMultiple(pushCount); SendRingStats("Post Push"); - if (res != NOS_RESULT_SUCCESS) - return res; - } - else if (Ring.IsShuttingDown()) - { return NOS_RESULT_FAILED; } - else + // TODO: Maybe a more understandable message here? + SetStatus(StatusType::Ratio, "In " + std::to_string(Ratio.x()) + ":" + std::to_string(Ratio.y()) + " Out", fb::NodeStatusMessageType::INFO); + uint32_t i = 0; + for (auto& elem : inputArrayObject) { - // Timeout - return NOS_RESULT_PENDING; + auto& dstSlot = dstSlots[i++]; + dstSlot->Object = elem; + dstSlot->FrameNumber = params.FrameNumber; } - if (pushCount == 0) - nosEngine.LogW("%s: Item count in array exceeded input ratio!", GetItemPath(NodeId).value_or("").c_str()); - if (pushCount != 0) - --pushCount; + Ring.EndPushMultiple(pushCount); + SendRingStats("Post Push"); + return NOS_RESULT_SUCCESS; } - if (pushCount > 0) - nosEngine.LogW("%s: Fewer items in input array than specified in ratio!", GetItemPath(NodeId).value_or("").c_str()); - return NOS_RESULT_SUCCESS; + if (Ring.IsShuttingDown()) + return NOS_RESULT_FAILED; + // Timeout + return NOS_RESULT_PENDING; + } + + void SetStatus(StatusType statusType, std::string const& message, fb::NodeStatusMessageType messageType) + { + auto msg = fb::TNodeStatusMessage{{}, message, messageType}; + if (StatusMessages[statusType] != msg) + { + StatusMessages[statusType] = msg; + UpdateStatus(); + } + } + + void ClearStatus(StatusType statusType) + { + auto it = StatusMessages.find(statusType); + if (it != StatusMessages.end()) + { + StatusMessages.erase(it); + UpdateStatus(); + } + } + + void UpdateStatus() + { + ClearNodeStatusMessages(); + std::vector messages; + for (auto const& [_, msg] : StatusMessages) + messages.push_back(msg); + SetNodeStatusMessages(messages); } }; @@ -260,4 +305,4 @@ nosResult RegisterFrameRateConverter(nosNodeFunctions* node) return NOS_RESULT_SUCCESS; } -} \ No newline at end of file +} // namespace nos::reflect \ No newline at end of file diff --git a/Plugins/nosReflect/Source/RingBuffer.hpp b/Plugins/nosReflect/Source/RingBuffer.hpp index 34e12781..2718fc8b 100644 --- a/Plugins/nosReflect/Source/RingBuffer.hpp +++ b/Plugins/nosReflect/Source/RingBuffer.hpp @@ -44,6 +44,24 @@ class RingBuffer return &Buffer[Head]; } + std::optional> BeginPushMultiple(size_t count, uint32_t timeoutMs) + { + std::unique_lock lock(Mutex); + if (!ReadyForPushCV.wait_for(lock, std::chrono::milliseconds(timeoutMs), + [this, count]() -> bool { + return (Size + count) <= Capacity || ExitRequested.load(); + })) + return std::nullopt; // timeout + + if (ExitRequested.load()) + return std::nullopt; + + std::vector result(count); + for (size_t i = 0; i < count; ++i) + result[i] = &Buffer[(Head + i) % Capacity]; + return result; + } + void EndPush() { std::unique_lock lock(Mutex); @@ -56,6 +74,18 @@ class RingBuffer } } + void EndPushMultiple(size_t count) + { + std::unique_lock lock(Mutex); + Head = (Head + count) % Capacity; + Size += count; + if (State == RingState::Filling && Size == Capacity) + { + State = RingState::Serving; + ReadyForPopCV.notify_all(); + } + } + T* BeginPop(uint32_t timeoutMs) { std::unique_lock lock(Mutex); @@ -75,6 +105,28 @@ class RingBuffer return &Buffer[Tail]; } + std::optional> BeginPopMultiple(size_t count, uint32_t timeoutMs) + { + std::unique_lock lock(Mutex); + if (!ReadyForPopCV.wait_for(lock, std::chrono::milliseconds(timeoutMs), + [this, count]() -> bool { + if (ExitRequested) + return true; + if (State == RingState::Filling) + return Size == Capacity; + return Size >= count; + })) + return std::nullopt; // timeout + + if (ExitRequested.load()) + return std::nullopt; + + std::vector result(count); + for (size_t i = 0; i < count; ++i) + result[i] = &Buffer[(Tail + i) % Capacity]; + return result; + } + void EndPop() { std::unique_lock lock(Mutex); @@ -85,6 +137,15 @@ class RingBuffer ReadyForPushCV.notify_one(); } + void EndPopMultiple(size_t count) + { + std::unique_lock lock(Mutex); + Tail = (Tail + count) % Capacity; + Size = (Size >= count) ? (Size - count) : 0; + lock.unlock(); + ReadyForPushCV.notify_all(); + } + void Shutdown() { ExitRequested.store(true); @@ -187,7 +248,7 @@ struct ObjectSlot ObjectSlot(ObjectRef obj) : Object(std::move(obj)) {} ObjectSlot(const ObjectSlot&) = delete; ObjectSlot& operator=(const ObjectSlot&) = delete; - nosResult CopyFrom(ObjectRef& obj) + nosResult CopyFrom(ObjectRef&& obj) { Object = std::move(obj); return NOS_RESULT_SUCCESS; @@ -368,7 +429,7 @@ struct RingBufferNodeBase : NodeContext return NOS_RESULT_FAILURE; } slot->FrameNumber = params.FrameNumber; - auto res = slot->CopyFrom(inputObject); + auto res = slot->CopyFrom(std::move(inputObject)); Ring.EndPush(); SendRingStats("Post Push"); if (res != NOS_RESULT_SUCCESS) From f72a876b6269959ae47dddd02bdfe9cdbd997e9e Mon Sep 17 00:00:00 2001 From: "M. Samil Atesoglu" Date: Wed, 7 Jan 2026 17:08:09 +0300 Subject: [PATCH 3/4] Simplify RingBuffer data structure --- .../nosReflect/Source/FrameRateConverter.cpp | 10 +-- Plugins/nosReflect/Source/RingBuffer.hpp | 63 ++++--------------- 2 files changed, 17 insertions(+), 56 deletions(-) diff --git a/Plugins/nosReflect/Source/FrameRateConverter.cpp b/Plugins/nosReflect/Source/FrameRateConverter.cpp index 4f659eb7..eeb2dfc4 100644 --- a/Plugins/nosReflect/Source/FrameRateConverter.cpp +++ b/Plugins/nosReflect/Source/FrameRateConverter.cpp @@ -132,7 +132,7 @@ struct FrameRateConverterNode : NodeContext std::optional> maybeSrcSlots; { ScopedProfilerEvent _({.Name = "Wait For Read"}); - maybeSrcSlots = Ring.BeginPopMultiple(popCount, 100); + maybeSrcSlots = Ring.BeginPop(popCount, 100); } if (maybeSrcSlots) { @@ -140,7 +140,7 @@ struct FrameRateConverterNode : NodeContext for (auto& srcSlot : srcSlots) outputObjectRefs.push_back(std::move(srcSlot->Object)); frameNumber = srcSlots[0]->FrameNumber; - Ring.EndPopMultiple(popCount); + Ring.EndPop(popCount); SendRingStats("Post Begin Pop"); } else if (Ring.IsShuttingDown()) @@ -234,7 +234,7 @@ struct FrameRateConverterNode : NodeContext std::optional> maybeDstSlots; { ScopedProfilerEvent _({.Name = "Wait For Empty Slot"}); - maybeDstSlots = Ring.BeginPushMultiple(pushCount, 100); + maybeDstSlots = Ring.BeginPush(pushCount, 100); } if (maybeDstSlots) { @@ -246,7 +246,7 @@ struct FrameRateConverterNode : NodeContext "Input array size (" + std::to_string(inSize) + ") does not match required input size (" + std::to_string(pushCount) + ")!", fb::NodeStatusMessageType::FAILURE); - Ring.EndPushMultiple(pushCount); + Ring.EndPush(pushCount); SendRingStats("Post Push"); return NOS_RESULT_FAILED; } @@ -259,7 +259,7 @@ struct FrameRateConverterNode : NodeContext dstSlot->Object = elem; dstSlot->FrameNumber = params.FrameNumber; } - Ring.EndPushMultiple(pushCount); + Ring.EndPush(pushCount); SendRingStats("Post Push"); return NOS_RESULT_SUCCESS; } diff --git a/Plugins/nosReflect/Source/RingBuffer.hpp b/Plugins/nosReflect/Source/RingBuffer.hpp index 2718fc8b..a4680e9c 100644 --- a/Plugins/nosReflect/Source/RingBuffer.hpp +++ b/Plugins/nosReflect/Source/RingBuffer.hpp @@ -17,10 +17,11 @@ class RingBuffer public: explicit RingBuffer(size_t capacity, RingBufferServeMode mode = RingBufferServeMode::WaitUntilFull) : Capacity(capacity), - Buffer(), + Buffer(capacity), Head(0), Tail(0), Size(0), + Mode(mode), ExitRequested(false) { Reset(capacity, mode); @@ -31,20 +32,13 @@ class RingBuffer T* BeginPush(uint32_t timeoutMs) { - std::unique_lock lock(Mutex); - if (!ReadyForPushCV.wait_for(lock, std::chrono::milliseconds(timeoutMs), - [this]() -> bool { - return Size < Capacity || ExitRequested.load(); - })) - return nullptr; // timeout - - if (ExitRequested.load()) + auto ret = BeginPush(1, timeoutMs); + if (!ret) return nullptr; - - return &Buffer[Head]; + return (*ret)[0]; } - std::optional> BeginPushMultiple(size_t count, uint32_t timeoutMs) + std::optional> BeginPush(size_t count, uint32_t timeoutMs) { std::unique_lock lock(Mutex); if (!ReadyForPushCV.wait_for(lock, std::chrono::milliseconds(timeoutMs), @@ -62,19 +56,7 @@ class RingBuffer return result; } - void EndPush() - { - std::unique_lock lock(Mutex); - Head = (Head + 1) % Capacity; - ++Size; - if (State == RingState::Filling && Size == Capacity) - { - State = RingState::Serving; - ReadyForPopCV.notify_one(); - } - } - - void EndPushMultiple(size_t count) + void EndPush(size_t count = 1) { std::unique_lock lock(Mutex); Head = (Head + count) % Capacity; @@ -88,24 +70,13 @@ class RingBuffer T* BeginPop(uint32_t timeoutMs) { - std::unique_lock lock(Mutex); - if (!ReadyForPopCV.wait_for(lock, std::chrono::milliseconds(timeoutMs), - [this]() -> bool { - if (ExitRequested) - return true; - if (State == RingState::Filling) - return Size == Capacity; - return Size > 0; - })) - return nullptr; // timeout - - if (ExitRequested.load()) + auto ret = BeginPop(1, timeoutMs); + if (!ret) return nullptr; - - return &Buffer[Tail]; + return (*ret)[0]; } - std::optional> BeginPopMultiple(size_t count, uint32_t timeoutMs) + std::optional> BeginPop(size_t count, uint32_t timeoutMs) { std::unique_lock lock(Mutex); if (!ReadyForPopCV.wait_for(lock, std::chrono::milliseconds(timeoutMs), @@ -127,17 +98,7 @@ class RingBuffer return result; } - void EndPop() - { - std::unique_lock lock(Mutex); - Tail = (Tail + 1) % Capacity; - if (Size > 0) - --Size; - lock.unlock(); - ReadyForPushCV.notify_one(); - } - - void EndPopMultiple(size_t count) + void EndPop(size_t count = 1) { std::unique_lock lock(Mutex); Tail = (Tail + count) % Capacity; From 50153b47a602310c0ae918799a047a66799c8a65 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 7 Jan 2026 14:10:23 +0000 Subject: [PATCH 4/4] Initial plan