Skip to content

Conversation

@Flw5469
Copy link

@Flw5469 Flw5469 commented May 21, 2025

Implement #749 based on #769.
Fix according to #795 so the above implementation works

  1. Make the Waiting Threads waiting using condition variables according to Add StateTracker::wait_state() with lock-free wakeup #749 (comment)
  2. According to Update Semaphore::timed_wait with sem_clockwait and converting domain #797 (comment), added compile time check in SConstruct for whether sem_clockedwait exist, set the clock of timedwait in POSIX platforms into CLOCK_MONOTONIC domain. Document of checkFunc can be found at https://scons.org/doc/2.3.1/HTML/scons-api/SCons.Conftest-pysrc.html#CheckFunc
  3. Added simple test cases of state_tracker's timeout and blocking.
  4. Added simple test case of POSIX semaphore's timeout.

@rocstreaming-bot
Copy link

🤖 Welcome! Thanks for your interest in contributing to the project!

Here is a short check-list to help you get started:

Creating pull request

  • Target PR to develop branch.
  • Include link to related issue in PR description.
  • Ensure all CI checks pass.

Code review

  • Mark PR as draft until it's ready. When ready, undraft and request review.
  • Don't resolve discussions by yourself, instead leave a comment or thumbs up.
  • Re-request review after addressing all discussions.

Refer to contribution guidelines for futher details.

@rocstreaming-bot rocstreaming-bot added contrib PR not by a maintainer S-ready-for-review status: PR can be reviewed labels May 21, 2025
@rocstreaming-bot rocstreaming-bot added the S-needs-rebase status: PR has conflicts and should be rebased label Jun 5, 2025
@rocstreaming-bot
Copy link

🤖 Pull request is currently unmergeable due to conflicts.
Please rebase on up-to-date upstream branch, resolve merge conflicts, and force-push to pull request's branch. Remember to use rebase with force-push instead of a regular merge.

@gavv gavv added S-review-in-progress status: PR is being reviewed and removed S-ready-for-review status: PR can be reviewed labels Jun 6, 2025
Copy link
Member

@gavv gavv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for PR! Here is my review.

Also sorry for merge conflicts, Atomic<T> was recently split into AtomicInt<T> + AtomicBool.

Comment on lines +958 to +968
#Check for existence of function sem_clockwait
temp_conf = Configure(env)
header = """
#ifdef __cplusplus
extern "C"
#endif
char sem_timedwait(void);
"""

if temp_conf.CheckFunc('sem_clockwait', header):
env.Append(CPPDEFINES=['ROC_HAVE_SEM_CLOCKWAIT'])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why you defined the function manually instead of including semaphore.h?

If no, I suggest to use include. Also let's move it to the section where we do other checks and already have conf:

diff --git a/SConstruct b/SConstruct
index f01da593..a5da8609 100644
--- a/SConstruct
+++ b/SConstruct
@@ -687,6 +687,9 @@ if meta.platform in ['linux', 'unix']:
 elif meta.platform in ['android']:
     meta.gnu_toolchain = True
 
+if conf.CheckFunc('sem_clockwait', header="#include <semaphore.h>\n"):
+    conf.env.Append(CPPDEFINES=['ROC_HAVE_SEM_CLOCKWAIT'])
+
 conf.env['ROC_SYSTEM_BINDIR'] = GetOption('bindir')
 conf.env['ROC_SYSTEM_INCDIR'] = GetOption('incdir')
 
@@ -955,20 +958,6 @@ if meta.compiler in ['gcc', 'clang']:
     if meta.platform in ['linux', 'darwin']:
         env.AddManualDependency(libs=['pthread'])
 
-#Check for existence of function sem_clockwait 
-    temp_conf = Configure(env)
-    header = """
-#ifdef __cplusplus
-extern "C"
-#endif
-char sem_timedwait(void);
-"""
-
-    if temp_conf.CheckFunc('sem_clockwait', header):
-        env.Append(CPPDEFINES=['ROC_HAVE_SEM_CLOCKWAIT'])
-
-
-
     if meta.platform in ['linux', 'android'] or meta.gnu_toolchain:
         if not GetOption('disable_soversion'):
             subenvs.public_libs['SHLIBSUFFIX'] = '{}.{}'.format(

Comment on lines +64 to 67
#else
if (sem_timedwait(&sem_, &ts) == 0) {
return true;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After giving this second thought, it seems that using sem_timedwait() is a really bad thing to use when we don't have sem_clockwait():

  1. If system time changes after we compute converted_deadline, but before we call sem_timedwait, there is a risk to sleep practically forever.

  2. If system time changes during sem_timedwait, depending on platform, there is the same risk, depending om platform. Some platforms will convert deadline to relative time, but some will wait until absolute time point taking account clock changes - which is the worst for us. BTW is seems it's how Linux works.

Then I checked what is the actual availability of sem_clockwait() on other POSIX OSes:

  • Linux has it
  • FreeBSD and QNX have close alternatives (sem_clockwait_np and sem_timedwait_monotonic)
  • seems that OpenBSD, NetBSD, Solaris, Redox don't have it
  • macOS doesn't have it but we don't need it there

This means that we're going to use sem_timedwait on quite a few platforms and our code will potentially hang on system time change.

An alternative is to implement semaphore using a cond variable on platforms without sem_clockwait(). POSIX cond vars also have timedwait variant (pthread_cond_timedwait). By default it uses CLOCK_REALTIME and so has same problem as sem_timedwait. But if platform support pthread_condattr_setclock(), you can use CLOCK_MONOTONIC.

The good news are that pthread_condattr_setclock(CLOCK_MONOTONIC) is supported on much more platforms compared to sem_clockwait: actually everything listed above except macOS have it (and macOS has non-portable alternative).


Long story short, I think when sem_clockwait() is not available, we should implement semaphore using a mutex + condition variable, instead of sem_timedwait() - this will give us correct code on much more platforms.

(Sorry that I haven't realized it earlier).

So I suggest to create two separate semaphore implementations in two target directories:

  1. target_posix_sem - current implementation, but it will unconditionally use sem_clockwait()

  2. target_nosem - fallback implementation for platforms without proper semaphores, using mutex and condvar

(and we also have target_darwin which implements semaphore for macOS)

We can do it like this:

diff --git a/SConstruct b/SConstruct
index f01da593..9904c913 100644
--- a/SConstruct
+++ b/SConstruct
@@ -794,9 +794,15 @@ else:
         ])

     if meta.platform in ['linux', 'android', 'unix']:
-        env.Append(ROC_TARGETS=[
-            'target_posix_ext',
-        ])
+        if 'ROC_HAVE_SEM_CLOCKWAIT' in env['CPPDEFINES']:
+            env.Append(ROC_TARGETS=[
+                'target_posix_sem',
+            ])
+        else:
+            env.Append(ROC_TARGETS=[
+                'target_nosem',
+            ])

     if meta.platform in ['linux', 'android', 'darwin'] or meta.gnu_toolchain:
         env.Append(ROC_TARGETS=[

We can rename src/internal_modules/roc_core/target_posix_ext to target_posix_sem (it doesn't have anything except semaphore) and enable it only if sem_clockwait() is present.

The fallback semaphore implementation can be placed into src/internal_modules/roc_core/target_nosem.

We already have core::Mutex and core::Cond with properly implemented timed_wait().


What do you think, are you willing to implement this?

Comment on lines +50 to +53
roc_log(roc::LogDebug, "origin time is %" PRId64 "\n", deadline);
roc_log(roc::LogDebug, "time is %" PRId64 "\n", converted_deadline);
roc_log(roc::LogDebug, "now time is %" PRId64 "\n",
core::timestamp(core::ClockMonotonic));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can be in hot path and called very frequently, so we shouldn't use logging here.

delete[] threads_ptr;
}

TEST(state_tracker, semaphore_test) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please move this to its own file (src/tests/roc_core/test_semaphore.cpp)

Comment on lines +181 to +182
if (sem.timed_wait(1 * core::Second + core::timestamp(core::ClockMonotonic)))
roc_log(LogDebug, "true, unlocked by other threads");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please always use {} with if/while/etc

Comment on lines +31 to +37
// This method should block until the state becomes any of the states specified by the
// mask, or deadline expires. E.g. if mask is ACTIVE | PAUSED, it should block until state
// becomes either ACTIVE or PAUSED. (Currently only two states are used, but later more
// states will be needed). Deadline should be an absolute timestamp.

// Questions:
// - When should the function return true vs false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's expand this comment and move it to .h file:

Suggested change
// This method should block until the state becomes any of the states specified by the
// mask, or deadline expires. E.g. if mask is ACTIVE | PAUSED, it should block until state
// becomes either ACTIVE or PAUSED. (Currently only two states are used, but later more
// states will be needed). Deadline should be an absolute timestamp.
// Questions:
// - When should the function return true vs false
//! Wait for state change.
//!
//! @remarks
//! Blocks until the state becomes any of the states specified by the mask,
//! or deadline expires. E.g. if mask is ACTIVE | PAUSED, blocks until
//! state becomes either ACTIVE or PAUSED.
//!
//! Deadline should be an absolute timestamp in ClockMonotonic domain.
//! Zero deadline means check state and return immediately.
//! Negative deadline means no deadline.
//!
//! @returns
//! true if state matches the mask and false if deadline expired.
//!
//! @note
//! Remember that pipeline state may be outdated immediately after this
//! method returns (e.g. if new packet arrives concurrently).

// Questions:
// - When should the function return true vs false
bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadline) {
waiting_mask_ = state_mask;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that waiting_mask_ is not used anywhere and can be removed.

Comment on lines +41 to +44
// If no state is specified in state_mask, return immediately
if (state_mask == 0) {
return true;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check can be done before the loop.

Comment on lines +46 to +62
if (static_cast<unsigned>(get_state()) & state_mask) {
waiting_mask_ = 0;
return true;
}

if (deadline >= 0 && deadline <= core::timestamp(core::ClockMonotonic)) {
waiting_mask_ = 0;
return false;
}

if (sem_is_occupied_.compare_exchange(0, 1)) {
if (deadline >= 0) {
(void)sem_.timed_wait(deadline);

} else {
sem_.wait();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race condition between wait_state() and signal_state_change().

Let's assume that T1 and T2 are two threads that called wait_state(ACTIVE) and register_packet(), respectively.

Imagine the following sequence of events.

  1. Initially, active_sessions_ == pending_packets_ == 0, state is IDLE, and there are no active waiters/signalers (sem_is_occupied_ == false, mutex unlocked, etc).

  2. T1: enters wait_state(ACTIVE)

  3. T1: if (get_state() & ACTIVE) { - condition returns false, T1 continues loop

  4. T2: enters register_packet()

  5. T2: pending_packets_++, then call signal_state_change()

  6. T2: if (sem_is_occupied_) { - condition returns false, T2 returns from signal_state_change() without signaling semaphore

  7. T1: if (sem_is_occupied_.compare_exchange(0, 1)) { - condition returns true, enters branch

  8. T1: sem_.wait(); - hang forever because T2 already exited without signaling semaphore

The problem is that we're checking state before doing CAS, but we're guaranteed to be notified on state change only after successful CAS.

Comment on lines +65 to +75
waiting_con_.broadcast();

} else {
core::Mutex::Lock lock(mutex_);

if (deadline >= 0) {
(void)waiting_con_.timed_wait(deadline);
} else {
waiting_con_.wait();
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is another race, between semaphore branch and mutex branches of concurrent wait_state() calls.

Let's assume three threads: W1 and W2 call wait_state() and S0 calls register_packet().

In this scenario, W1 and W2 initially entered wait_state() at the same time, but then W2 was delayed a bit.

  1. W1: if (get_state() & state_mask) { - condition is false

  2. W2: if (get_state() & state_mask) { - condition is false

  3. W1: if (sem_is_occupied_.compare_exchange(0, 1)) { - success, enters branch

  4. W1: blocks on sem_.wait();

  5. S0: increments pending_packets_, triggers semaphore

  6. W1: wakes up, calls waiting_con_.broadcast(), checks state and returns from wait_state()

  7. W2: core::Mutex::Lock lock(mutex_); - acquires mutex

  8. W2: waiting_con_.wait(); - blocks on cond var forever, because W1 called broadcast() before W2 called wait() and W1 exited already

The problem is that semaphore branch triggers cond var without holding the mutex, and that both branches check state before acquiring the mutex.

Copy link
Member

@gavv gavv Jun 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing I don't quite like in current implementation are unnecessary wake-ups, also related to how mutex is handled currently.

Imagine 3 threads all invoke wait_state() concurrently, thread A blocks on semaphore, and threads B and C on block cond var.

Then the state changes, but not in the way that these threads are waiting for:

  1. thread A is awakened by state change (via sem) and triggers cond var
  2. thread B is awakened (via cond var) and unlocks mutex
  3. thread C is awakened (via cond var) and unlocks mutex
  4. threads A, B, C check state and all decide that they need to sleep again
  5. threads A, B, C compete on CAS
  6. the winner, say it's thread A again, blocks on semaphore
  7. threads B and C compete on mutex
  8. say, thread B wins and acquires the mutex
  9. thread C blocks on mutex
  10. thread B blocks on cond var (releasing the mutex)
  11. thread C is awakened, and acquires mutex
  12. thread C blocks on cond var

Here, thread (C) goes to sleep in (9) and wakes up in (11), only to go to sleep again in (12).

And if there are more waiting threads (D, E, F ...), each of them will have that extra wake-up.

Copy link
Member

@gavv gavv Jun 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both race condition and extra wake up can be both eliminated by revising when we lock and unlock the mutex.

Something like this:

/// POINT P1 (before mutex lock)

mutex.lock()

is_sem_owner = sem_occupied.CAS(false, true)

while true {
    // Note that we check state: after locking the mutex, after doing the CAS,
    // and before sleeping,

    if (get_state() & state_mask) {

        if (is_sem_owner) {
            // We're sem owner and going to return.
            ///
            // Other waiters can be in points P1 or P3, waiting for mutex or cond var,
            // but not in P2.x (because only sem owner can be in P2).
            //
            // If some waiter is in P1, it doesn't need to be signaled yet.
            // If some waiter is in P3, it could entered there only when we were in P2,
            // which means that we already signaled it (see below).
            //
            // So we can safely return without signaling cond var.
            sem_occupied = false
        }

        mutex.unlock()
        return
    }

    if (is_sem_owner) {
        // Release mutex before blocking on sem
        mutex.unlock()

        /// POINT P2.A  (mutex unlocked)
        sem.wait()
        /// POINT P2.B  (mutex still unlocked)

        // Re-acquire mutex before proceeding
        mutex.lock()

        // We are sem owner and have re-acquired the mutex.
        //
        // Other waiters can be either in P1 or P3 (but not in P2.x).
        //
        // If some waiter is in P1, it doesn't need to be signaled yet.
        //
        // If some waiter is in P3, we should signal it from here.
        // Since we've locked the mutex, we can be sure that the waiter
        // already entered cond_var.wait(), which unlocked the mutex.
        // Hence, broadcast() is guaranteed to wake it up.

        cond_var.broadcast()
    } else {
        // The code above guarantees that sem owner will wake up us
        // on state change before returning.

        cond_var.wait()  /// POINT P3  (mutex unlocked during wait())
    }
}

This is just rough pseudo-code, feel free to play with it as you want. Also I guess my comments are too verbose for real code.

And regarding unnecessary wake-ups: not releasing the mutex across loop iterations helps OS to eliminate unnecessary wake-ups.

E.g. on glibc, when we call cond_var.broadcast(), it will re-queue waiters from condvar's futex to mutex's futex. Since we're holding the lock, this won't trigger wake up. After we release the lock, one waiter will be awaken, when it releases the lock, the second one will be awaken, and so on.

In other words, on each state change, each waiter will wake-up exactly once, check state, and either return or go to sleep until next state change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the review! I shifted my focus elsewhere, getting back on this now.

Copy link
Author

@Flw5469 Flw5469 Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a bug?
TLDR: Thread T2 that entered cond_var before will not get semaphore when the semaphore thread return. It leaves no thread accepting the signal from semaphore (before a new thread enters the function).

T1 wait at point P2.A sem.wait() for state A
T2 wait at point P3 cond_var.wait() for state B
T3 is the signaling thread, current state is state C.

1: T1: sem.wait()
2: T2: cond_var.wait()
3: T3: change state to A (desired by T1), post sem.
4. T1: acquire mutex, broadcast cond_var, check state, release sem, release mutex, return.
5. T2: get woke up from cond_var.wait(), check state, enter cond_var.wait()
6. T3: change state to B (desired by T2), post sem.
7. T2: still waiting at cond_var.wait() (Not wanted)

I am seeing if I can think of a better flow. Please let me know if I misunderstood something. Thanks!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nvm it seems to be not a big deal, I can just move the CAS inside the loop.

@rocstreaming-bot rocstreaming-bot added the S-needs-revision status: Author should revise PR and address feedback label Jun 7, 2025
@gavv gavv removed the S-review-in-progress status: PR is being reviewed label Jun 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

contrib PR not by a maintainer S-needs-rebase status: PR has conflicts and should be rebased S-needs-revision status: Author should revise PR and address feedback

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants