From ee8cff71cfd5205005293996c5be6b15464fdf80 Mon Sep 17 00:00:00 2001 From: flw5469 Date: Sun, 26 Jan 2025 18:50:41 +0800 Subject: [PATCH 01/11] update --- .gitignore | 2 +- .../roc_core/slab_pool_impl.cpp | 13 ++ .../target_posix_ext/roc_core/semaphore.cpp | 34 +++- src/internal_modules/roc_core/timer.cpp | 7 + .../roc_pipeline/state_tracker.cpp | 133 ++++++++++++++- .../roc_pipeline/state_tracker.h | 19 ++- src/tests/roc_core/test_timer.cpp | 29 ++++ src/tests/roc_pipeline/test_state_tracker.cpp | 157 ++++++++++++++++++ 8 files changed, 380 insertions(+), 14 deletions(-) create mode 100644 src/tests/roc_pipeline/test_state_tracker.cpp diff --git a/.gitignore b/.gitignore index bac233e93..957cd5c82 100644 --- a/.gitignore +++ b/.gitignore @@ -58,4 +58,4 @@ debian/roc /Makefile .justfile .projectile -.unisonignore +.unisonignore \ No newline at end of file diff --git a/src/internal_modules/roc_core/slab_pool_impl.cpp b/src/internal_modules/roc_core/slab_pool_impl.cpp index 00e61d245..a4edf3b0b 100644 --- a/src/internal_modules/roc_core/slab_pool_impl.cpp +++ b/src/internal_modules/roc_core/slab_pool_impl.cpp @@ -130,6 +130,7 @@ size_t SlabPoolImpl::num_guard_failures() const { return num_guard_failures_; } +//let user get this piece of memory from slot void* SlabPoolImpl::give_slot_to_user_(Slot* slot) { slot->~Slot(); @@ -148,6 +149,7 @@ void* SlabPoolImpl::give_slot_to_user_(Slot* slot) { return memory; } +// take this memory from user, check if within same pool SlabPoolImpl::Slot* SlabPoolImpl::take_slot_from_user_(void* memory) { SlotHeader* slot_hdr = ROC_CONTAINER_OF((char*)memory - sizeof(SlotCanary), SlotHeader, data); @@ -186,6 +188,7 @@ SlabPoolImpl::Slot* SlabPoolImpl::take_slot_from_user_(void* memory) { return new (slot_hdr) Slot; } +// return a slot (locally) from free slot, used in allocate() SlabPoolImpl::Slot* SlabPoolImpl::acquire_slot_() { if (free_slots_.is_empty()) { allocate_new_slab_(); @@ -200,6 +203,7 @@ SlabPoolImpl::Slot* SlabPoolImpl::acquire_slot_() { return slot; } +//put the slot back to freeslot void SlabPoolImpl::release_slot_(Slot* slot) { if (n_used_slots_ == 0) { roc_panic("slab pool (%s): unpaired deallocation", name_); @@ -209,6 +213,7 @@ void SlabPoolImpl::release_slot_(Slot* slot) { free_slots_.push_front(*slot); } +// add free_slots size by keep adding new slabs bool SlabPoolImpl::reserve_slots_(size_t desired_slots) { if (desired_slots > free_slots_.size()) { increase_slab_size_(desired_slots - free_slots_.size()); @@ -223,6 +228,8 @@ bool SlabPoolImpl::reserve_slots_(size_t desired_slots) { return true; } +// add free slot size (a helper function to be called) +// keep doubling cur_slots until it is greater than desired_slots, but limit it under max_slots. void SlabPoolImpl::increase_slab_size_(size_t desired_slots) { if (desired_slots > slab_max_slots_ && slab_max_slots_ != 0) { desired_slots = slab_max_slots_; @@ -238,6 +245,9 @@ void SlabPoolImpl::increase_slab_size_(size_t desired_slots) { } } +// allocate memory with size of header+size of all slots right now. +// Put a slab on it and put its pointer into slabs list, +// fill the memory up with multiple slots, and put them into the freeslot list. (total number of slots will be slab_cur_slots_,) bool SlabPoolImpl::allocate_new_slab_() { const size_t slab_size_bytes = slot_offset_(slab_cur_slots_); @@ -254,10 +264,12 @@ bool SlabPoolImpl::allocate_new_slab_() { free_slots_.push_back(*slot); } + // grows the next slab size because I want to increase_slab_size_(slab_cur_slots_ * 2); return true; } +//remove freeslots and slabs void SlabPoolImpl::deallocate_everything_() { if (n_used_slots_ != 0) { if (report_guard_(SlabPool_LeakGuard)) { @@ -277,6 +289,7 @@ void SlabPoolImpl::deallocate_everything_() { } } +// put the memory into free slots void SlabPoolImpl::add_preallocated_memory_(void* memory, size_t memory_size) { if (memory == NULL) { roc_panic("slab pool (%s): preallocated memory is null", name_); diff --git a/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp b/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp index 0d9685390..071642846 100644 --- a/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp +++ b/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp @@ -10,10 +10,17 @@ #include "roc_core/cpu_instructions.h" #include "roc_core/errno_to_str.h" #include "roc_core/panic.h" +#include "roc_core/log.h" #include #include +#ifdef sem_clockwait +#define HAS_SEM_CLOCKWAIT 1 +#else +#define HAS_SEM_CLOCKWAIT 0 +#endif + namespace roc { namespace core { @@ -34,19 +41,36 @@ Semaphore::~Semaphore() { } bool Semaphore::timed_wait(nanoseconds_t deadline) { + if (deadline < 0) { roc_panic("semaphore: unexpected negative deadline"); } + nanoseconds_t converted_deadline = deadline; + for (;;) { + + // WHAT IF CHANGE SYSTEM TIME WHEN -1? + if (!HAS_SEM_CLOCKWAIT) { + converted_deadline = deadline + core::timestamp(core::ClockUnix) - core::timestamp(core::ClockMonotonic); + } + timespec ts; - ts.tv_sec = long(deadline / Second); - ts.tv_nsec = long(deadline % Second); + ts.tv_sec = long(converted_deadline / Second); + ts.tv_nsec = long(converted_deadline % Second); + roc_log(LogDebug,"time to wait: %li",ts.tv_sec); - if (sem_timedwait(&sem_, &ts) == 0) { - return true; + if (HAS_SEM_CLOCKWAIT) { + if (sem_clockwait(&sem_, core::ClockMonotonic ,&ts) == 0) { + return true; + } + } else { + if (sem_timedwait(&sem_, &ts) == 0) { + return true; + } } + if (errno == ETIMEDOUT) { return false; } @@ -61,7 +85,7 @@ void Semaphore::wait() { for (;;) { if (sem_wait(&sem_) == 0) { return; - } + } if (errno != EINTR) { roc_panic("semaphore: sem_wait(): %s", errno_to_str().c_str()); } diff --git a/src/internal_modules/roc_core/timer.cpp b/src/internal_modules/roc_core/timer.cpp index 74b6e2798..66f834fc8 100644 --- a/src/internal_modules/roc_core/timer.cpp +++ b/src/internal_modules/roc_core/timer.cpp @@ -30,6 +30,8 @@ bool Timer::try_set_deadline(nanoseconds_t new_deadline) { next_wakeup = -1; } + // if 1. new deadline is earlier than the scheduled wakeup time; or 2. nextwakeup <0 ,so timer is not active + // post only if sem flag is not set (to aviod duplicate signaling) if (next_wakeup < 0 || (new_deadline >= 0 && new_deadline < next_wakeup)) { if (sem_post_flag_.compare_exchange(false, true)) { sem_.post(); @@ -41,14 +43,18 @@ bool Timer::try_set_deadline(nanoseconds_t new_deadline) { void Timer::wait_deadline() { for (;;) { + // set a lock on next_wakeup? next_wakeup_.exclusive_store(-1); const nanoseconds_t deadline = deadline_.wait_load(); + // continue if ddl is less than a clock (just because input is less, ddl is never decremented.) if (deadline >= 0 && deadline <= timestamp(ClockMonotonic)) { break; } + // wait forever if no deadline (will wakeup when other people set deadline) + // stuck here until sem timeout if have deadline (will I wake up if other people wake sem?) (do other people come to this statement too?) if (deadline > 0) { next_wakeup_.exclusive_store(deadline); (void)sem_.timed_wait(deadline); @@ -59,6 +65,7 @@ void Timer::wait_deadline() { sem_post_flag_ = false; } + //release the lock next_wakeup_.exclusive_store(0); } diff --git a/src/internal_modules/roc_pipeline/state_tracker.cpp b/src/internal_modules/roc_pipeline/state_tracker.cpp index bc2b6ef6f..f91a26ea9 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.cpp +++ b/src/internal_modules/roc_pipeline/state_tracker.cpp @@ -8,14 +8,104 @@ #include "roc_pipeline/state_tracker.h" #include "roc_core/panic.h" +#include "roc_core/log.h" namespace roc { namespace pipeline { StateTracker::StateTracker() - : halt_state_(-1) + : sem_(0) + , halt_state_(-1) , active_sessions_(0) - , pending_packets_(0) { + , pending_packets_(0) + , sem_is_occupied_(false) + , waiting_mask_(0) + , waiting_con_(mutex_) { +} + +// StateTracker::~StateTracker() { +// mutex_.unlock(); +// } + +// This method should block until the state becomes any of the states specified by the +// mask, or deadline expires. E.g. if mask is ACTIVE | PAUSED, it should block until state +// becomes either ACTIVE or PAUSED. (Currently only two states are used, but later more +// states will be needed). Deadline should be an absolute timestamp. + +// Questions: +// - When should the function return true vs false +bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadline) { + roc_log(LogDebug,"enter wait state"); + waiting_mask_ = state_mask; + for (;;) { + + // If no state is specified in state_mask, return immediately + if (state_mask == 0) { + roc_log(LogDebug, + "branch 1"); + return true; + } + + if (static_cast(get_state()) & state_mask) { + waiting_mask_ = 0; + roc_log(LogDebug, + "branch 2 (correct state) %u %u", static_cast(get_state()), state_mask); + return true; + } + + if (deadline >= 0 && deadline <= core::timestamp(core::ClockMonotonic)) { + roc_log(LogDebug, "branch 3 (timeout) %ld", core::timestamp(core::ClockMonotonic)); + waiting_mask_ = 0; + return false; + } + + // if (deadline >= 0) { + // if (!sem_.timed_wait(deadline)) { + // waiting_mask_ = 0; + // return false; + // } + // } else { + // sem_.wait(); + // } + + //change to CAS here. + if (sem_is_occupied_.compare_exchange(false, true)) { + + roc_log(LogDebug,"sleep on sem"); + + if (deadline >= 0) { + roc_log(LogDebug, "entering timed wait with deadline %ld, current time is: %ld", deadline, core::timestamp(core::ClockMonotonic)); + sem_.timed_wait(deadline); + roc_log(LogDebug, "exiting timed wait"); + + } else { + roc_log(LogDebug, "untimed wait"); + sem_.wait(); + } + + sem_is_occupied_ = false; + waiting_con_.broadcast(); + + + } else { + + core::Mutex::Lock lock(mutex_); + + roc_log(LogDebug,"sleep on cond"); + + if (deadline >= 0) { + waiting_con_.timed_wait(deadline); + } else { + waiting_con_.wait(); + } + + } + roc_log(LogDebug,"finished this loop"); + + + } + roc_log(LogDebug,"exit wait state"); + } sndio::DeviceState StateTracker::get_state() const { @@ -65,24 +155,53 @@ size_t StateTracker::num_sessions() const { } void StateTracker::register_session() { - active_sessions_++; + if (active_sessions_++ == 0) { + signal_state_change(); + } } void StateTracker::unregister_session() { - if (--active_sessions_ < 0) { + int prev_sessions = active_sessions_--; + if (prev_sessions == 0) { roc_panic("state tracker: unpaired register/unregister session"); + } else if (prev_sessions == 1 && pending_packets_ == 0) { + signal_state_change(); } + + // if (--active_sessions_ < 0) { + // roc_panic("state tracker: unpaired register/unregister session"); + // } } void StateTracker::register_packet() { - pending_packets_++; + if (pending_packets_++ == 0 && active_sessions_ == 0) { + signal_state_change(); + } } void StateTracker::unregister_packet() { - if (--pending_packets_ < 0) { + int prev_packets = pending_packets_--; + if (prev_packets == 0) { roc_panic("state tracker: unpaired register/unregister packet"); + } else if (prev_packets == 1 && active_sessions_ == 0) { + signal_state_change(); } + + // if (--pending_packets_ < 0) { + // roc_panic("state tracker: unpaired register/unregister packet"); + // } +} + +void StateTracker::signal_state_change() { + // if (waiting_mask_ != 0 && (static_cast(get_state()) & waiting_mask_)) { + // sem_.post(); + // } + if (sem_is_occupied_) { + roc_log(LogDebug, "signaling"); + sem_.post(); + } + } } // namespace pipeline -} // namespace roc +} // namespace roc \ No newline at end of file diff --git a/src/internal_modules/roc_pipeline/state_tracker.h b/src/internal_modules/roc_pipeline/state_tracker.h index 1f5bb1499..d02d3798a 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.h +++ b/src/internal_modules/roc_pipeline/state_tracker.h @@ -14,7 +14,12 @@ #include "roc_core/atomic.h" #include "roc_core/noncopyable.h" +#include "roc_core/semaphore.h" #include "roc_core/stddefs.h" +#include "roc_core/time.h" +#include "roc_core/cond.h" +#include "roc_core/mutex.h" +#include "roc_core/atomic.h" #include "roc_sndio/device_state.h" namespace roc { @@ -32,6 +37,9 @@ class StateTracker : public core::NonCopyable<> { //! Initialize all counters to zero. StateTracker(); + //! Block until state becomes any of the ones specified by state_mask. + bool wait_state(unsigned state_mask, core::nanoseconds_t deadline); + //! Compute current state. sndio::DeviceState get_state() const; @@ -63,12 +71,21 @@ class StateTracker : public core::NonCopyable<> { void unregister_packet(); private: + core::Semaphore sem_; core::Atomic halt_state_; core::Atomic active_sessions_; core::Atomic pending_packets_; + core::Atomic sem_is_occupied_; + core::Atomic waiting_mask_; + core::Cond waiting_con_; + core::Mutex mutex_; + + + + void signal_state_change(); }; } // namespace pipeline } // namespace roc -#endif // ROC_PIPELINE_STATE_TRACKER_H_ +#endif // ROC_PIPELINE_STATE_TRACKER_H_ \ No newline at end of file diff --git a/src/tests/roc_core/test_timer.cpp b/src/tests/roc_core/test_timer.cpp index c97c10a79..85ef91692 100644 --- a/src/tests/roc_core/test_timer.cpp +++ b/src/tests/roc_core/test_timer.cpp @@ -38,6 +38,7 @@ class TestThread : public Thread { private: virtual void run() { r_ = true; + // very likely this is the line that cause deadlock t_.wait_deadline(); r_ = false; } @@ -185,6 +186,34 @@ TEST(timer, async) { thr.join(); } } + { // repeat + Timer t; + int num = 2; + + TestThread* threads[3]; + set_deadline(t, 1 * Second); + + for (int i = 0; i < num; i++) { + threads[i] = new TestThread(t); // Dynamic allocation + CHECK(threads[i]->start()); + threads[i]->wait_running(); + + // moved this line into the loop and solved the never end issue + sleep_for(ClockMonotonic, Microsecond * 10000); + + } + + + for (int i = 0; i < num; i++) { + CHECK(threads[i]->running()); + } + set_deadline(t, 0); + for (int i = 0; i < num; i++) { + + threads[i]->join(); + delete threads[i]; // Free the memory + } + } } } // namespace core diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp new file mode 100644 index 000000000..8f501afc7 --- /dev/null +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -0,0 +1,157 @@ + /* + * Copyright (c) 2023 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "test_harness.h" + +#include "roc_address/protocol.h" +#include "roc_audio/mixer.h" +#include "roc_audio/sample.h" +#include "roc_core/atomic.h" +#include "roc_core/heap_arena.h" +#include "roc_core/noop_arena.h" +#include "roc_core/thread.h" +#include "roc_core/time.h" +#include "roc_pipeline/config.h" +#include "roc_pipeline/receiver_endpoint.h" +#include "roc_pipeline/receiver_session_group.h" + +namespace roc { +namespace pipeline { + +namespace { + +enum { PacketSz = 512 }; + +core::HeapArena arena; + +packet::PacketFactory packet_factory(arena, PacketSz); +audio::FrameFactory frame_factory(arena, PacketSz * sizeof(audio::sample_t)); + +audio::ProcessorMap processor_map(arena); +rtp::EncodingMap encoding_map(arena); + +class TestThread : public core::Thread { +public: + TestThread(StateTracker& st, unsigned int state_mask, core::nanoseconds_t deadline) + : t_(st) + , r_(0) + , state_mask_(state_mask) + , deadline_(deadline) { + } + + bool running() const { + return r_; + } + + void wait_running() { + while (!r_) { + core::sleep_for(core::ClockMonotonic, core::Microsecond); + } + } + private: + + virtual void run() { + r_ = true; + t_.wait_state(state_mask_, deadline_); + r_ = false; + } + + StateTracker& t_; + core::Atomic r_; + unsigned int state_mask_; + core::nanoseconds_t deadline_; +}; + +} // namespace + +TEST_GROUP(state_tracker) {}; + +TEST(state_tracker, simple_timeout) { + + StateTracker state_tracker; + TestThread thr(state_tracker,sndio::DeviceState_Active, core::timestamp(core::ClockMonotonic) + core::Second * 0.1); + + CHECK(thr.start()); + core::sleep_for(core::ClockMonotonic, core::Second * 0.5); + CHECK(!(thr.running())); + thr.join(); +} + +TEST(state_tracker, multiple_timeout) { + + StateTracker state_tracker; + TestThread** threads_ptr = new TestThread*[10]; + for (int i=0;i<10;i++){ + threads_ptr[i] = new TestThread(state_tracker,sndio::DeviceState_Active, core::timestamp(core::ClockMonotonic) + core::Second * 1); + } + + for (int i=0;i<10;i++){ + CHECK(threads_ptr[i]->start()); + } + + roc_log(LogDebug,"started running"); + core::sleep_for(core::ClockMonotonic, core::Microsecond * 1000); + + for (int i=0;i<10;i++){ + CHECK(threads_ptr[i]->running()); + } + + roc_log(LogDebug,"started joining"); + + for (int i=0;i<10;i++){ + threads_ptr[i]->join(); + } + + roc_log(LogDebug,"finished joining"); + +} + +TEST(state_tracker, multiple_switch) { + + StateTracker state_tracker; + TestThread** threads_ptr = new TestThread*[10]; + for (int i=0;i<10;i++){ + threads_ptr[i] = new TestThread(state_tracker,sndio::DeviceState_Active, -1); + } + + for (int i=0;i<10;i++){ + CHECK(threads_ptr[i]->start()); + } + + roc_log(LogDebug,"started running"); + core::sleep_for(core::ClockMonotonic, core::Second * 0.5); + + for (int i=0;i<10;i++){ + CHECK(threads_ptr[i]->running()); + } + + core::sleep_for(core::ClockMonotonic,core::Second * 0.5); + state_tracker.register_packet(); + core::sleep_for(core::ClockMonotonic,core::Second * 0.5); + + for (int i=0;i<10;i++){ + CHECK(!(threads_ptr[i]->running())); + } + + roc_log(LogDebug,"started joining"); + for (int i=0;i<10;i++){ + threads_ptr[i]->join(); + } + roc_log(LogDebug,"finished joining"); +} + + +#include "roc_core/semaphore.h" +TEST(state_tracker, semaphore_test) { + core::Semaphore sem(0); + if (sem.timed_wait(1 * core::Second + core::timestamp(core::ClockMonotonic))) + roc_log(LogDebug, "true"); + else roc_log(LogDebug, "false"); +} +} // namespace pipeline +} // namespace roc From 6e52eac06a0bc8d9af7882c79294806158928235 Mon Sep 17 00:00:00 2001 From: flw5469 Date: Sun, 26 Jan 2025 19:40:45 +0800 Subject: [PATCH 02/11] Update StateTracker::wait_state() --- .../roc_pipeline/state_tracker.cpp | 133 +++++++++++++++++- .../roc_pipeline/state_tracker.h | 19 ++- 2 files changed, 144 insertions(+), 8 deletions(-) diff --git a/src/internal_modules/roc_pipeline/state_tracker.cpp b/src/internal_modules/roc_pipeline/state_tracker.cpp index bc2b6ef6f..f91a26ea9 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.cpp +++ b/src/internal_modules/roc_pipeline/state_tracker.cpp @@ -8,14 +8,104 @@ #include "roc_pipeline/state_tracker.h" #include "roc_core/panic.h" +#include "roc_core/log.h" namespace roc { namespace pipeline { StateTracker::StateTracker() - : halt_state_(-1) + : sem_(0) + , halt_state_(-1) , active_sessions_(0) - , pending_packets_(0) { + , pending_packets_(0) + , sem_is_occupied_(false) + , waiting_mask_(0) + , waiting_con_(mutex_) { +} + +// StateTracker::~StateTracker() { +// mutex_.unlock(); +// } + +// This method should block until the state becomes any of the states specified by the +// mask, or deadline expires. E.g. if mask is ACTIVE | PAUSED, it should block until state +// becomes either ACTIVE or PAUSED. (Currently only two states are used, but later more +// states will be needed). Deadline should be an absolute timestamp. + +// Questions: +// - When should the function return true vs false +bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadline) { + roc_log(LogDebug,"enter wait state"); + waiting_mask_ = state_mask; + for (;;) { + + // If no state is specified in state_mask, return immediately + if (state_mask == 0) { + roc_log(LogDebug, + "branch 1"); + return true; + } + + if (static_cast(get_state()) & state_mask) { + waiting_mask_ = 0; + roc_log(LogDebug, + "branch 2 (correct state) %u %u", static_cast(get_state()), state_mask); + return true; + } + + if (deadline >= 0 && deadline <= core::timestamp(core::ClockMonotonic)) { + roc_log(LogDebug, "branch 3 (timeout) %ld", core::timestamp(core::ClockMonotonic)); + waiting_mask_ = 0; + return false; + } + + // if (deadline >= 0) { + // if (!sem_.timed_wait(deadline)) { + // waiting_mask_ = 0; + // return false; + // } + // } else { + // sem_.wait(); + // } + + //change to CAS here. + if (sem_is_occupied_.compare_exchange(false, true)) { + + roc_log(LogDebug,"sleep on sem"); + + if (deadline >= 0) { + roc_log(LogDebug, "entering timed wait with deadline %ld, current time is: %ld", deadline, core::timestamp(core::ClockMonotonic)); + sem_.timed_wait(deadline); + roc_log(LogDebug, "exiting timed wait"); + + } else { + roc_log(LogDebug, "untimed wait"); + sem_.wait(); + } + + sem_is_occupied_ = false; + waiting_con_.broadcast(); + + + } else { + + core::Mutex::Lock lock(mutex_); + + roc_log(LogDebug,"sleep on cond"); + + if (deadline >= 0) { + waiting_con_.timed_wait(deadline); + } else { + waiting_con_.wait(); + } + + } + roc_log(LogDebug,"finished this loop"); + + + } + roc_log(LogDebug,"exit wait state"); + } sndio::DeviceState StateTracker::get_state() const { @@ -65,24 +155,53 @@ size_t StateTracker::num_sessions() const { } void StateTracker::register_session() { - active_sessions_++; + if (active_sessions_++ == 0) { + signal_state_change(); + } } void StateTracker::unregister_session() { - if (--active_sessions_ < 0) { + int prev_sessions = active_sessions_--; + if (prev_sessions == 0) { roc_panic("state tracker: unpaired register/unregister session"); + } else if (prev_sessions == 1 && pending_packets_ == 0) { + signal_state_change(); } + + // if (--active_sessions_ < 0) { + // roc_panic("state tracker: unpaired register/unregister session"); + // } } void StateTracker::register_packet() { - pending_packets_++; + if (pending_packets_++ == 0 && active_sessions_ == 0) { + signal_state_change(); + } } void StateTracker::unregister_packet() { - if (--pending_packets_ < 0) { + int prev_packets = pending_packets_--; + if (prev_packets == 0) { roc_panic("state tracker: unpaired register/unregister packet"); + } else if (prev_packets == 1 && active_sessions_ == 0) { + signal_state_change(); } + + // if (--pending_packets_ < 0) { + // roc_panic("state tracker: unpaired register/unregister packet"); + // } +} + +void StateTracker::signal_state_change() { + // if (waiting_mask_ != 0 && (static_cast(get_state()) & waiting_mask_)) { + // sem_.post(); + // } + if (sem_is_occupied_) { + roc_log(LogDebug, "signaling"); + sem_.post(); + } + } } // namespace pipeline -} // namespace roc +} // namespace roc \ No newline at end of file diff --git a/src/internal_modules/roc_pipeline/state_tracker.h b/src/internal_modules/roc_pipeline/state_tracker.h index 1f5bb1499..d02d3798a 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.h +++ b/src/internal_modules/roc_pipeline/state_tracker.h @@ -14,7 +14,12 @@ #include "roc_core/atomic.h" #include "roc_core/noncopyable.h" +#include "roc_core/semaphore.h" #include "roc_core/stddefs.h" +#include "roc_core/time.h" +#include "roc_core/cond.h" +#include "roc_core/mutex.h" +#include "roc_core/atomic.h" #include "roc_sndio/device_state.h" namespace roc { @@ -32,6 +37,9 @@ class StateTracker : public core::NonCopyable<> { //! Initialize all counters to zero. StateTracker(); + //! Block until state becomes any of the ones specified by state_mask. + bool wait_state(unsigned state_mask, core::nanoseconds_t deadline); + //! Compute current state. sndio::DeviceState get_state() const; @@ -63,12 +71,21 @@ class StateTracker : public core::NonCopyable<> { void unregister_packet(); private: + core::Semaphore sem_; core::Atomic halt_state_; core::Atomic active_sessions_; core::Atomic pending_packets_; + core::Atomic sem_is_occupied_; + core::Atomic waiting_mask_; + core::Cond waiting_con_; + core::Mutex mutex_; + + + + void signal_state_change(); }; } // namespace pipeline } // namespace roc -#endif // ROC_PIPELINE_STATE_TRACKER_H_ +#endif // ROC_PIPELINE_STATE_TRACKER_H_ \ No newline at end of file From ac123da2724204b151a9e443383d1034e2701224 Mon Sep 17 00:00:00 2001 From: flw5469 Date: Sun, 26 Jan 2025 19:54:31 +0800 Subject: [PATCH 03/11] format code --- .../roc_pipeline/state_tracker.cpp | 54 ++++--------------- .../roc_pipeline/state_tracker.h | 9 ++-- 2 files changed, 13 insertions(+), 50 deletions(-) diff --git a/src/internal_modules/roc_pipeline/state_tracker.cpp b/src/internal_modules/roc_pipeline/state_tracker.cpp index f91a26ea9..0b1e2d45d 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.cpp +++ b/src/internal_modules/roc_pipeline/state_tracker.cpp @@ -7,8 +7,8 @@ */ #include "roc_pipeline/state_tracker.h" -#include "roc_core/panic.h" #include "roc_core/log.h" +#include "roc_core/panic.h" namespace roc { namespace pipeline { @@ -35,77 +35,44 @@ StateTracker::StateTracker() // Questions: // - When should the function return true vs false bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadline) { - roc_log(LogDebug,"enter wait state"); waiting_mask_ = state_mask; for (;;) { - // If no state is specified in state_mask, return immediately if (state_mask == 0) { - roc_log(LogDebug, - "branch 1"); return true; } if (static_cast(get_state()) & state_mask) { waiting_mask_ = 0; - roc_log(LogDebug, - "branch 2 (correct state) %u %u", static_cast(get_state()), state_mask); return true; } if (deadline >= 0 && deadline <= core::timestamp(core::ClockMonotonic)) { - roc_log(LogDebug, "branch 3 (timeout) %ld", core::timestamp(core::ClockMonotonic)); waiting_mask_ = 0; return false; } - // if (deadline >= 0) { - // if (!sem_.timed_wait(deadline)) { - // waiting_mask_ = 0; - // return false; - // } - // } else { - // sem_.wait(); - // } - - //change to CAS here. if (sem_is_occupied_.compare_exchange(false, true)) { - - roc_log(LogDebug,"sleep on sem"); - if (deadline >= 0) { - roc_log(LogDebug, "entering timed wait with deadline %ld, current time is: %ld", deadline, core::timestamp(core::ClockMonotonic)); sem_.timed_wait(deadline); - roc_log(LogDebug, "exiting timed wait"); } else { - roc_log(LogDebug, "untimed wait"); sem_.wait(); } - + sem_is_occupied_ = false; waiting_con_.broadcast(); - } else { - core::Mutex::Lock lock(mutex_); - roc_log(LogDebug,"sleep on cond"); - - if (deadline >= 0) { - waiting_con_.timed_wait(deadline); - } else { - waiting_con_.wait(); - } - + if (deadline >= 0) { + waiting_con_.timed_wait(deadline); + } else { + waiting_con_.wait(); + } } - roc_log(LogDebug,"finished this loop"); - - } - roc_log(LogDebug,"exit wait state"); - } sndio::DeviceState StateTracker::get_state() const { @@ -197,11 +164,10 @@ void StateTracker::signal_state_change() { // sem_.post(); // } if (sem_is_occupied_) { - roc_log(LogDebug, "signaling"); - sem_.post(); + roc_log(LogDebug, "signaling"); + sem_.post(); } - } } // namespace pipeline -} // namespace roc \ No newline at end of file +} // namespace roc diff --git a/src/internal_modules/roc_pipeline/state_tracker.h b/src/internal_modules/roc_pipeline/state_tracker.h index d02d3798a..ef7dbdcce 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.h +++ b/src/internal_modules/roc_pipeline/state_tracker.h @@ -13,13 +13,12 @@ #define ROC_PIPELINE_STATE_TRACKER_H_ #include "roc_core/atomic.h" +#include "roc_core/cond.h" +#include "roc_core/mutex.h" #include "roc_core/noncopyable.h" #include "roc_core/semaphore.h" #include "roc_core/stddefs.h" #include "roc_core/time.h" -#include "roc_core/cond.h" -#include "roc_core/mutex.h" -#include "roc_core/atomic.h" #include "roc_sndio/device_state.h" namespace roc { @@ -79,8 +78,6 @@ class StateTracker : public core::NonCopyable<> { core::Atomic waiting_mask_; core::Cond waiting_con_; core::Mutex mutex_; - - void signal_state_change(); }; @@ -88,4 +85,4 @@ class StateTracker : public core::NonCopyable<> { } // namespace pipeline } // namespace roc -#endif // ROC_PIPELINE_STATE_TRACKER_H_ \ No newline at end of file +#endif // ROC_PIPELINE_STATE_TRACKER_H_ From 0b7bd0080f74d62291f1d2b83fe038f78a3bda11 Mon Sep 17 00:00:00 2001 From: flw5469 Date: Sun, 26 Jan 2025 21:16:18 +0800 Subject: [PATCH 04/11] Fix: add conversion for timed_wait and mutex initalization to prevent warnings in other platforms --- src/internal_modules/roc_pipeline/state_tracker.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/internal_modules/roc_pipeline/state_tracker.cpp b/src/internal_modules/roc_pipeline/state_tracker.cpp index 0b1e2d45d..dc02bf602 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.cpp +++ b/src/internal_modules/roc_pipeline/state_tracker.cpp @@ -20,6 +20,7 @@ StateTracker::StateTracker() , pending_packets_(0) , sem_is_occupied_(false) , waiting_mask_(0) + , mutex_() , waiting_con_(mutex_) { } @@ -54,7 +55,7 @@ bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadl if (sem_is_occupied_.compare_exchange(false, true)) { if (deadline >= 0) { - sem_.timed_wait(deadline); + (void)sem_.timed_wait(deadline); } else { sem_.wait(); @@ -67,7 +68,7 @@ bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadl core::Mutex::Lock lock(mutex_); if (deadline >= 0) { - waiting_con_.timed_wait(deadline); + (void)waiting_con_.timed_wait(deadline); } else { waiting_con_.wait(); } From 4241beaa848e3d006de6f6fff30035b1b5cf9e2e Mon Sep 17 00:00:00 2001 From: flw5469 Date: Sun, 26 Jan 2025 21:24:43 +0800 Subject: [PATCH 05/11] fix: reordered initalization of mutex and conditional variable --- src/internal_modules/roc_pipeline/state_tracker.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/internal_modules/roc_pipeline/state_tracker.h b/src/internal_modules/roc_pipeline/state_tracker.h index ef7dbdcce..f52304447 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.h +++ b/src/internal_modules/roc_pipeline/state_tracker.h @@ -76,8 +76,8 @@ class StateTracker : public core::NonCopyable<> { core::Atomic pending_packets_; core::Atomic sem_is_occupied_; core::Atomic waiting_mask_; - core::Cond waiting_con_; core::Mutex mutex_; + core::Cond waiting_con_; void signal_state_change(); }; From d6fae93dc3cad3262e20a8d174d5a307d2d7300e Mon Sep 17 00:00:00 2001 From: flw5469 Date: Sun, 26 Jan 2025 21:35:23 +0800 Subject: [PATCH 06/11] Add simple test cases for StateTracker::wait_state() --- src/tests/roc_pipeline/test_state_tracker.cpp | 156 ++++++++++++++++++ 1 file changed, 156 insertions(+) create mode 100644 src/tests/roc_pipeline/test_state_tracker.cpp diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp new file mode 100644 index 000000000..de3904700 --- /dev/null +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -0,0 +1,156 @@ +/* + * Copyright (c) 2023 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "test_harness.h" + +#include "roc_address/protocol.h" +#include "roc_audio/mixer.h" +#include "roc_audio/sample.h" +#include "roc_core/atomic.h" +#include "roc_core/heap_arena.h" +#include "roc_core/noop_arena.h" +#include "roc_core/thread.h" +#include "roc_core/time.h" +#include "roc_pipeline/config.h" +#include "roc_pipeline/receiver_endpoint.h" +#include "roc_pipeline/receiver_session_group.h" + +namespace roc { +namespace pipeline { + +namespace { + +enum { PacketSz = 512 }; + +core::HeapArena arena; + +packet::PacketFactory packet_factory(arena, PacketSz); +audio::FrameFactory frame_factory(arena, PacketSz * sizeof(audio::sample_t)); + +audio::ProcessorMap processor_map(arena); +rtp::EncodingMap encoding_map(arena); + +class TestThread : public core::Thread { +public: + TestThread(StateTracker& st, unsigned int state_mask, core::nanoseconds_t deadline) + : t_(st) + , r_(0) + , state_mask_(state_mask) + , deadline_(deadline) { + } + + bool running() const { + return r_; + } + + void wait_running() { + while (!r_) { + core::sleep_for(core::ClockMonotonic, core::Microsecond); + } + } + +private: + virtual void run() { + r_ = true; + t_.wait_state(state_mask_, deadline_); + r_ = false; + } + + StateTracker& t_; + core::Atomic r_; + unsigned int state_mask_; + core::nanoseconds_t deadline_; +}; + +} // namespace + +TEST_GROUP(state_tracker) {}; + +TEST(state_tracker, simple_timeout) { + StateTracker state_tracker; + TestThread thr(state_tracker, sndio::DeviceState_Active, + core::timestamp(core::ClockMonotonic) + core::Second * 0.1); + + CHECK(thr.start()); + core::sleep_for(core::ClockMonotonic, core::Second * 0.5); + CHECK(!(thr.running())); + thr.join(); +} + +TEST(state_tracker, multiple_timeout) { + StateTracker state_tracker; + TestThread** threads_ptr = new TestThread*[10]; + for (int i = 0; i < 10; i++) { + threads_ptr[i] = + new TestThread(state_tracker, sndio::DeviceState_Active, + core::timestamp(core::ClockMonotonic) + core::Second * 1); + } + + for (int i = 0; i < 10; i++) { + CHECK(threads_ptr[i]->start()); + } + + roc_log(LogDebug, "started running"); + core::sleep_for(core::ClockMonotonic, core::Microsecond * 1000); + + for (int i = 0; i < 10; i++) { + CHECK(threads_ptr[i]->running()); + } + + roc_log(LogDebug, "started joining"); + + for (int i = 0; i < 10; i++) { + threads_ptr[i]->join(); + } + + roc_log(LogDebug, "finished joining"); +} + +TEST(state_tracker, multiple_switch) { + StateTracker state_tracker; + TestThread** threads_ptr = new TestThread*[10]; + for (int i = 0; i < 10; i++) { + threads_ptr[i] = new TestThread(state_tracker, sndio::DeviceState_Active, -1); + } + + for (int i = 0; i < 10; i++) { + CHECK(threads_ptr[i]->start()); + } + + roc_log(LogDebug, "started running"); + core::sleep_for(core::ClockMonotonic, core::Second * 0.5); + + for (int i = 0; i < 10; i++) { + CHECK(threads_ptr[i]->running()); + } + + core::sleep_for(core::ClockMonotonic, core::Second * 0.5); + state_tracker.register_packet(); + core::sleep_for(core::ClockMonotonic, core::Second * 0.5); + + for (int i = 0; i < 10; i++) { + CHECK(!(threads_ptr[i]->running())); + } + + roc_log(LogDebug, "started joining"); + for (int i = 0; i < 10; i++) { + threads_ptr[i]->join(); + } + roc_log(LogDebug, "finished joining"); +} + +#include "roc_core/semaphore.h" +TEST(state_tracker, semaphore_test) { + core::Semaphore sem(0); + if (sem.timed_wait(1 * core::Second + core::timestamp(core::ClockMonotonic))) + roc_log(LogDebug, "true"); + else + roc_log(LogDebug, "false"); +} +} // namespace pipeline +} // namespace roc From 67fb8233324fc0ea2da813db685f98a6135ef520 Mon Sep 17 00:00:00 2001 From: flw5469 Date: Wed, 12 Feb 2025 01:08:37 +0800 Subject: [PATCH 07/11] update: use millisecond instead of second as unit --- src/tests/roc_pipeline/test_state_tracker.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp index de3904700..aaf781a83 100644 --- a/src/tests/roc_pipeline/test_state_tracker.cpp +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -74,10 +74,10 @@ TEST_GROUP(state_tracker) {}; TEST(state_tracker, simple_timeout) { StateTracker state_tracker; TestThread thr(state_tracker, sndio::DeviceState_Active, - core::timestamp(core::ClockMonotonic) + core::Second * 0.1); + core::timestamp(core::ClockMonotonic) + core::Millisecond * 500); CHECK(thr.start()); - core::sleep_for(core::ClockMonotonic, core::Second * 0.5); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); CHECK(!(thr.running())); thr.join(); } @@ -88,7 +88,7 @@ TEST(state_tracker, multiple_timeout) { for (int i = 0; i < 10; i++) { threads_ptr[i] = new TestThread(state_tracker, sndio::DeviceState_Active, - core::timestamp(core::ClockMonotonic) + core::Second * 1); + core::timestamp(core::ClockMonotonic) + core::Millisecond * 1000); } for (int i = 0; i < 10; i++) { @@ -123,15 +123,15 @@ TEST(state_tracker, multiple_switch) { } roc_log(LogDebug, "started running"); - core::sleep_for(core::ClockMonotonic, core::Second * 0.5); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); for (int i = 0; i < 10; i++) { CHECK(threads_ptr[i]->running()); } - core::sleep_for(core::ClockMonotonic, core::Second * 0.5); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); state_tracker.register_packet(); - core::sleep_for(core::ClockMonotonic, core::Second * 0.5); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); for (int i = 0; i < 10; i++) { CHECK(!(threads_ptr[i]->running())); From 1f797762f3bee18d68bb0663de6efdb0ed526ca8 Mon Sep 17 00:00:00 2001 From: flw5469 Date: Wed, 12 Feb 2025 13:55:31 +0800 Subject: [PATCH 08/11] update: put import line location in front --- src/tests/roc_pipeline/test_state_tracker.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp index aaf781a83..994175364 100644 --- a/src/tests/roc_pipeline/test_state_tracker.cpp +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -19,6 +19,7 @@ #include "roc_pipeline/config.h" #include "roc_pipeline/receiver_endpoint.h" #include "roc_pipeline/receiver_session_group.h" +#include "roc_core/semaphore.h" namespace roc { namespace pipeline { @@ -144,7 +145,6 @@ TEST(state_tracker, multiple_switch) { roc_log(LogDebug, "finished joining"); } -#include "roc_core/semaphore.h" TEST(state_tracker, semaphore_test) { core::Semaphore sem(0); if (sem.timed_wait(1 * core::Second + core::timestamp(core::ClockMonotonic))) From 8c77a53fcf6e2c61cfc96ec21e06797deaddbc75 Mon Sep 17 00:00:00 2001 From: flw5469 Date: Sat, 15 Feb 2025 18:44:17 +0800 Subject: [PATCH 09/11] fix compatibility bugs --- src/internal_modules/roc_pipeline/state_tracker.cpp | 6 +++--- src/internal_modules/roc_pipeline/state_tracker.h | 2 +- src/tests/roc_pipeline/test_state_tracker.cpp | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/internal_modules/roc_pipeline/state_tracker.cpp b/src/internal_modules/roc_pipeline/state_tracker.cpp index dc02bf602..9f0032638 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.cpp +++ b/src/internal_modules/roc_pipeline/state_tracker.cpp @@ -18,7 +18,7 @@ StateTracker::StateTracker() , halt_state_(-1) , active_sessions_(0) , pending_packets_(0) - , sem_is_occupied_(false) + , sem_is_occupied_(0) , waiting_mask_(0) , mutex_() , waiting_con_(mutex_) { @@ -53,7 +53,7 @@ bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadl return false; } - if (sem_is_occupied_.compare_exchange(false, true)) { + if (sem_is_occupied_.compare_exchange(0, 1)) { if (deadline >= 0) { (void)sem_.timed_wait(deadline); @@ -61,7 +61,7 @@ bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadl sem_.wait(); } - sem_is_occupied_ = false; + sem_is_occupied_ = 0; waiting_con_.broadcast(); } else { diff --git a/src/internal_modules/roc_pipeline/state_tracker.h b/src/internal_modules/roc_pipeline/state_tracker.h index f52304447..fd99fd79c 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.h +++ b/src/internal_modules/roc_pipeline/state_tracker.h @@ -74,7 +74,7 @@ class StateTracker : public core::NonCopyable<> { core::Atomic halt_state_; core::Atomic active_sessions_; core::Atomic pending_packets_; - core::Atomic sem_is_occupied_; + core::Atomic sem_is_occupied_; core::Atomic waiting_mask_; core::Mutex mutex_; core::Cond waiting_con_; diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp index 994175364..d997c0340 100644 --- a/src/tests/roc_pipeline/test_state_tracker.cpp +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -78,7 +78,7 @@ TEST(state_tracker, simple_timeout) { core::timestamp(core::ClockMonotonic) + core::Millisecond * 500); CHECK(thr.start()); - core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 1000); CHECK(!(thr.running())); thr.join(); } @@ -97,7 +97,7 @@ TEST(state_tracker, multiple_timeout) { } roc_log(LogDebug, "started running"); - core::sleep_for(core::ClockMonotonic, core::Microsecond * 1000); + core::sleep_for(core::ClockMonotonic, core::Microsecond * 2000); for (int i = 0; i < 10; i++) { CHECK(threads_ptr[i]->running()); From 8c7c6c3854f82946765a95f053ad5953f4bd07c2 Mon Sep 17 00:00:00 2001 From: flw5469 Date: Sat, 15 Feb 2025 18:56:58 +0800 Subject: [PATCH 10/11] fix bug in checking --- src/tests/roc_pipeline/test_state_tracker.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp index d997c0340..72a9e1dc0 100644 --- a/src/tests/roc_pipeline/test_state_tracker.cpp +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -97,10 +97,10 @@ TEST(state_tracker, multiple_timeout) { } roc_log(LogDebug, "started running"); - core::sleep_for(core::ClockMonotonic, core::Microsecond * 2000); + core::sleep_for(core::ClockMonotonic, core::Microsecond * 5000); for (int i = 0; i < 10; i++) { - CHECK(threads_ptr[i]->running()); + CHECK(!threads_ptr[i]->running()); } roc_log(LogDebug, "started joining"); From ef89b19fbf6d777d93a69d8be76a620e3e427af4 Mon Sep 17 00:00:00 2001 From: flw5469 Date: Sun, 16 Feb 2025 15:43:24 +0800 Subject: [PATCH 11/11] fix bug in test case --- src/tests/roc_pipeline/test_state_tracker.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp index 72a9e1dc0..f406fe699 100644 --- a/src/tests/roc_pipeline/test_state_tracker.cpp +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -97,7 +97,7 @@ TEST(state_tracker, multiple_timeout) { } roc_log(LogDebug, "started running"); - core::sleep_for(core::ClockMonotonic, core::Microsecond * 5000); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 2000); for (int i = 0; i < 10; i++) { CHECK(!threads_ptr[i]->running());