diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d0d0a346b5984..ddf62562dca19 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -91,6 +91,13 @@ jobs: miri: runs-on: macos-latest timeout-minutes: 60 + env: + # -Zrandomize-layout makes sure we dont rely on the layout of anything that might change + RUSTFLAGS: -Zrandomize-layout + # https://github.com/rust-lang/miri#miri--z-flags-and-environment-variables + # -Zmiri-disable-isolation is needed because our executor uses `fastrand` which accesses system time. + # -Zmiri-ignore-leaks is necessary because a bunch of tests don't join all threads before finishing. + MIRIFLAGS: -Zmiri-ignore-leaks -Zmiri-disable-isolation steps: - uses: actions/checkout@v5 - uses: actions/cache/restore@v4 @@ -111,17 +118,14 @@ jobs: with: toolchain: ${{ env.NIGHTLY_TOOLCHAIN }} components: miri - - name: CI job + - name: CI job (Tasks) + # To run the tests one item at a time for troubleshooting, use + # cargo --quiet test --lib -- --list | sed 's/: test$//' | MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-disable-weak-memory-emulation" xargs -n1 cargo miri test -p bevy_tasks --lib -- --exact + run: cargo miri test -p bevy_tasks --features bevy_executor --features multi_threaded + - name: CI job (ECS) # To run the tests one item at a time for troubleshooting, use # cargo --quiet test --lib -- --list | sed 's/: test$//' | MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-disable-weak-memory-emulation" xargs -n1 cargo miri test -p bevy_ecs --lib -- --exact run: cargo miri test -p bevy_ecs --features bevy_utils/debug - env: - # -Zrandomize-layout makes sure we dont rely on the layout of anything that might change - RUSTFLAGS: -Zrandomize-layout - # https://github.com/rust-lang/miri#miri--z-flags-and-environment-variables - # -Zmiri-disable-isolation is needed because our executor uses `fastrand` which accesses system time. - # -Zmiri-ignore-leaks is necessary because a bunch of tests don't join all threads before finishing. - MIRIFLAGS: -Zmiri-ignore-leaks -Zmiri-disable-isolation check-compiles: runs-on: ubuntu-latest diff --git a/Cargo.toml b/Cargo.toml index dbf0401117928..97ded15ff9209 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -127,7 +127,6 @@ unused_qualifications = "warn" [features] default = [ "std", - "async_executor", "android-game-activity", "android_shared_stdcxx", "animation", @@ -566,9 +565,6 @@ custom_cursor = ["bevy_internal/custom_cursor"] # Experimental support for nodes that are ignored for UI layouting ghost_nodes = ["bevy_internal/ghost_nodes"] -# Uses `async-executor` as a task execution backend. -async_executor = ["std", "bevy_internal/async_executor"] - # Allows access to the `std` crate. std = ["bevy_internal/std"] diff --git a/crates/bevy_a11y/Cargo.toml b/crates/bevy_a11y/Cargo.toml index 4d36eeecc1011..1a2d255cd879a 100644 --- a/crates/bevy_a11y/Cargo.toml +++ b/crates/bevy_a11y/Cargo.toml @@ -9,7 +9,7 @@ license = "MIT OR Apache-2.0" keywords = ["bevy", "accessibility", "a11y"] [features] -default = ["std", "bevy_reflect", "bevy_ecs/async_executor"] +default = ["std", "bevy_reflect"] # Functionality diff --git a/crates/bevy_app/src/app.rs b/crates/bevy_app/src/app.rs index 789debfac3578..92c703a0f1125 100644 --- a/crates/bevy_app/src/app.rs +++ b/crates/bevy_app/src/app.rs @@ -1476,8 +1476,7 @@ type RunnerFn = Box AppExit>; fn run_once(mut app: App) -> AppExit { while app.plugins_state() == PluginsState::Adding { - #[cfg(not(all(target_arch = "wasm32", feature = "web")))] - bevy_tasks::tick_global_task_pools_on_main_thread(); + core::hint::spin_loop(); } app.finish(); app.cleanup(); diff --git a/crates/bevy_app/src/schedule_runner.rs b/crates/bevy_app/src/schedule_runner.rs index 594f849b2f905..7e987cfe58be1 100644 --- a/crates/bevy_app/src/schedule_runner.rs +++ b/crates/bevy_app/src/schedule_runner.rs @@ -77,8 +77,7 @@ impl Plugin for ScheduleRunnerPlugin { let plugins_state = app.plugins_state(); if plugins_state != PluginsState::Cleaned { while app.plugins_state() == PluginsState::Adding { - #[cfg(not(all(target_arch = "wasm32", feature = "web")))] - bevy_tasks::tick_global_task_pools_on_main_thread(); + core::hint::spin_loop(); } app.finish(); app.cleanup(); diff --git a/crates/bevy_app/src/task_pool_plugin.rs b/crates/bevy_app/src/task_pool_plugin.rs index 8014790f07772..a68938910cd1d 100644 --- a/crates/bevy_app/src/task_pool_plugin.rs +++ b/crates/bevy_app/src/task_pool_plugin.rs @@ -6,21 +6,6 @@ use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuil use core::fmt::Debug; use log::trace; -cfg_if::cfg_if! { - if #[cfg(not(all(target_arch = "wasm32", feature = "web")))] { - use {crate::Last, bevy_tasks::tick_global_task_pools_on_main_thread}; - use bevy_ecs::system::NonSendMarker; - - /// A system used to check and advanced our task pools. - /// - /// Calls [`tick_global_task_pools_on_main_thread`], - /// and uses [`NonSendMarker`] to ensure that this system runs on the main thread - fn tick_global_task_pools(_main_thread_marker: NonSendMarker) { - tick_global_task_pools_on_main_thread(); - } - } -} - /// Setup of default task pools: [`AsyncComputeTaskPool`], [`ComputeTaskPool`], [`IoTaskPool`]. #[derive(Default)] pub struct TaskPoolPlugin { @@ -32,9 +17,6 @@ impl Plugin for TaskPoolPlugin { fn build(&self, _app: &mut App) { // Setup the default bevy task pools self.task_pool_options.create_default_pools(); - - #[cfg(not(all(target_arch = "wasm32", feature = "web")))] - _app.add_systems(Last, tick_global_task_pools); } } diff --git a/crates/bevy_asset/Cargo.toml b/crates/bevy_asset/Cargo.toml index bde2ad5e033c3..c1c1801b885b3 100644 --- a/crates/bevy_asset/Cargo.toml +++ b/crates/bevy_asset/Cargo.toml @@ -30,9 +30,7 @@ bevy_ecs = { path = "../bevy_ecs", version = "0.18.0-dev", default-features = fa bevy_reflect = { path = "../bevy_reflect", version = "0.18.0-dev", default-features = false, features = [ "uuid", ] } -bevy_tasks = { path = "../bevy_tasks", version = "0.18.0-dev", default-features = false, features = [ - "async_executor", -] } +bevy_tasks = { path = "../bevy_tasks", version = "0.18.0-dev", default-features = false } bevy_utils = { path = "../bevy_utils", version = "0.18.0-dev", default-features = false } bevy_platform = { path = "../bevy_platform", version = "0.18.0-dev", default-features = false, features = [ "std", diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index eb39ed859e33a..394ee0c56f172 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -11,7 +11,7 @@ categories = ["game-engines", "data-structures"] rust-version = "1.86.0" [features] -default = ["std", "bevy_reflect", "async_executor", "backtrace"] +default = ["std", "bevy_reflect", "backtrace"] # Functionality @@ -49,12 +49,6 @@ bevy_debug_stepping = [] ## This will often provide more detailed error messages. track_location = [] -# Executor Backend - -## Uses `async-executor` as a task execution backend. -## This backend is incompatible with `no_std` targets. -async_executor = ["std", "bevy_tasks/async_executor"] - # Platform Compatibility ## Allows access to the `std` crate. Enabling this feature will prevent compilation diff --git a/crates/bevy_ecs/src/schedule/executor/mod.rs b/crates/bevy_ecs/src/schedule/executor/mod.rs index 7d595aeec1a8d..4c23522414601 100644 --- a/crates/bevy_ecs/src/schedule/executor/mod.rs +++ b/crates/bevy_ecs/src/schedule/executor/mod.rs @@ -9,7 +9,7 @@ use core::any::TypeId; pub use self::single_threaded::SingleThreadedExecutor; #[cfg(feature = "std")] -pub use self::multi_threaded::{MainThreadExecutor, MultiThreadedExecutor}; +pub use self::multi_threaded::{MainThreadTaskSpawner, MultiThreadedExecutor}; use fixedbitset::FixedBitSet; diff --git a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs index 497b937c31f51..d0aadea11b9b0 100644 --- a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs @@ -1,7 +1,6 @@ use alloc::{boxed::Box, vec::Vec}; use bevy_platform::cell::SyncUnsafeCell; -use bevy_platform::sync::Arc; -use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor}; +use bevy_tasks::{ComputeTaskPool, LocalTaskSpawner, Scope, TaskPool}; use concurrent_queue::ConcurrentQueue; use core::{any::Any, panic::AssertUnwindSafe}; use fixedbitset::FixedBitSet; @@ -270,14 +269,12 @@ impl SystemExecutor for MultiThreadedExecutor { } let thread_executor = world - .get_resource::() + .get_resource::() .map(|e| e.0.clone()); - let thread_executor = thread_executor.as_deref(); let environment = &Environment::new(self, schedule, world); ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor( - false, thread_executor, |scope| { let context = Context { @@ -871,20 +868,20 @@ unsafe fn evaluate_and_fold_conditions( .fold(true, |acc, res| acc && res) } -/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread +/// New-typed [`LocalTaskSpawner`] [`Resource`] that is used to run systems on the main thread #[derive(Resource, Clone)] -pub struct MainThreadExecutor(pub Arc>); +pub struct MainThreadTaskSpawner(pub LocalTaskSpawner<'static>); -impl Default for MainThreadExecutor { +impl Default for MainThreadTaskSpawner { fn default() -> Self { Self::new() } } -impl MainThreadExecutor { +impl MainThreadTaskSpawner { /// Creates a new executor that can be used to run systems on the main thread. pub fn new() -> Self { - MainThreadExecutor(TaskPool::get_thread_executor()) + MainThreadTaskSpawner(ComputeTaskPool::get().current_thread_spawner()) } } diff --git a/crates/bevy_input/Cargo.toml b/crates/bevy_input/Cargo.toml index cf6d1a3dd9add..b0e1bcb2f8585 100644 --- a/crates/bevy_input/Cargo.toml +++ b/crates/bevy_input/Cargo.toml @@ -9,7 +9,7 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [features] -default = ["std", "bevy_reflect", "bevy_ecs/async_executor", "smol_str"] +default = ["std", "bevy_reflect", "smol_str"] # Functionality diff --git a/crates/bevy_input_focus/Cargo.toml b/crates/bevy_input_focus/Cargo.toml index 1766ee2243b31..e03a2e0d21fe1 100644 --- a/crates/bevy_input_focus/Cargo.toml +++ b/crates/bevy_input_focus/Cargo.toml @@ -10,7 +10,7 @@ keywords = ["bevy"] rust-version = "1.85.0" [features] -default = ["std", "bevy_reflect", "bevy_ecs/async_executor"] +default = ["std", "bevy_reflect"] # Functionality diff --git a/crates/bevy_internal/Cargo.toml b/crates/bevy_internal/Cargo.toml index 3a43a38147c50..0ed62327171db 100644 --- a/crates/bevy_internal/Cargo.toml +++ b/crates/bevy_internal/Cargo.toml @@ -409,15 +409,6 @@ libm = [ "bevy_window?/libm", ] -# Uses `async-executor` as a task execution backend. -# This backend is incompatible with `no_std` targets. -async_executor = [ - "std", - "bevy_tasks/async_executor", - "bevy_ecs/async_executor", - "bevy_transform/async_executor", -] - # Enables use of browser APIs. # Note this is currently only applicable on `wasm32` architectures. web = ["bevy_app/web", "bevy_platform/web", "bevy_reflect/web"] diff --git a/crates/bevy_render/src/pipelined_rendering.rs b/crates/bevy_render/src/pipelined_rendering.rs index 3aaa2ac8cc203..455fc65ccffa9 100644 --- a/crates/bevy_render/src/pipelined_rendering.rs +++ b/crates/bevy_render/src/pipelined_rendering.rs @@ -3,7 +3,7 @@ use async_channel::{Receiver, Sender}; use bevy_app::{App, AppExit, AppLabel, Plugin, SubApp}; use bevy_ecs::{ resource::Resource, - schedule::MainThreadExecutor, + schedule::MainThreadTaskSpawner, world::{Mut, World}, }; use bevy_tasks::ComputeTaskPool; @@ -114,7 +114,7 @@ impl Plugin for PipelinedRenderingPlugin { if app.get_sub_app(RenderApp).is_none() { return; } - app.insert_resource(MainThreadExecutor::new()); + app.insert_resource(MainThreadTaskSpawner::new()); let mut sub_app = SubApp::new(); sub_app.set_extract(renderer_extract); @@ -136,7 +136,7 @@ impl Plugin for PipelinedRenderingPlugin { .expect("Unable to get RenderApp. Another plugin may have removed the RenderApp before PipelinedRenderingPlugin"); // clone main thread executor to render world - let executor = app.world().get_resource::().unwrap(); + let executor = app.world().get_resource::().unwrap(); render_app.world_mut().insert_resource(executor.clone()); render_to_app_sender.send_blocking(render_app).unwrap(); @@ -181,12 +181,12 @@ impl Plugin for PipelinedRenderingPlugin { // This function waits for the rendering world to be received, // runs extract, and then sends the rendering world back to the render thread. fn renderer_extract(app_world: &mut World, _world: &mut World) { - app_world.resource_scope(|world, main_thread_executor: Mut| { + app_world.resource_scope(|world, main_thread_executor: Mut| { world.resource_scope(|world, mut render_channels: Mut| { // we use a scope here to run any main thread tasks that the render world still needs to run // while we wait for the render world to be received. if let Some(mut render_app) = ComputeTaskPool::get() - .scope_with_executor(true, Some(&*main_thread_executor.0), |s| { + .scope_with_executor(Some(main_thread_executor.0.clone()), |s| { s.spawn(async { render_channels.recv().await }); }) .pop() diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index abe5a1485a6ad..f9cda7aa9d2cb 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -9,20 +9,24 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [features] -default = ["async_executor", "futures-lite"] +default = ["futures-lite"] # Enables multi-threading support. # Without this feature, all tasks will be run on a single thread. -multi_threaded = [ - "bevy_platform/std", - "dep:async-channel", - "dep:concurrent-queue", - "async_executor", -] +multi_threaded = ["bevy_platform/std", "dep:async-channel", "bevy_executor"] -# Uses `async-executor` as a task execution backend. +# Uses a Bevy-specific fork of `async-executor` as a task execution backend. # This backend is incompatible with `no_std` targets. -async_executor = ["bevy_platform/std", "dep:async-executor", "futures-lite"] +bevy_executor = [ + "dep:fastrand", + "dep:slab", + "dep:thread_local", + "dep:crossbeam-utils", + "dep:pin-project-lite", + "futures-lite", + "async-task/std", + "concurrent-queue/std", +] # Provide an implementation of `block_on` from `futures-lite`. futures-lite = ["bevy_platform/std", "futures-lite/std"] @@ -44,17 +48,18 @@ derive_more = { version = "2", default-features = false, features = [ "deref", "deref_mut", ] } -async-executor = { version = "1.11", optional = true } +slab = { version = "0.4", optional = true } +pin-project-lite = { version = "0.2", optional = true } +thread_local = { version = "1.1", optional = true } +fastrand = { version = "2.3", optional = true, default-features = false } async-channel = { version = "2.3.0", optional = true } async-io = { version = "2.0.0", optional = true } -concurrent-queue = { version = "2.0.0", optional = true } atomic-waker = { version = "1", default-features = false } -crossbeam-queue = { version = "0.3", default-features = false, features = [ - "alloc", -] } +concurrent-queue = { version = "2.5", default-features = false } +crossbeam-utils = { version = "0.8", default-features = false, optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] -pin-project = "1" +pin-project-lite = "0.2" async-channel = { version = "2.3.0", default-features = false } [target.'cfg(not(all(target_has_atomic = "8", target_has_atomic = "16", target_has_atomic = "32", target_has_atomic = "64", target_has_atomic = "ptr")))'.dependencies] @@ -73,6 +78,7 @@ futures-lite = { version = "2.0.1", default-features = false, features = [ "std", ] } async-channel = "2.3.0" +async-io = "2.0.0" [lints] workspace = true diff --git a/crates/bevy_tasks/README.md b/crates/bevy_tasks/README.md index 04815df35e6ed..530489bfaa115 100644 --- a/crates/bevy_tasks/README.md +++ b/crates/bevy_tasks/README.md @@ -14,8 +14,8 @@ a single thread and having that thread await the completion of those tasks. This generating the tasks from a slice of data. This library is intended for games and makes no attempt to ensure fairness or ordering of spawned tasks. -It is based on [`async-executor`][async-executor], a lightweight executor that allows the end user to manage their own threads. -`async-executor` is based on async-task, a core piece of async-std. +It is based on a fork of [`async-executor`][async-executor], a lightweight executor that allows the end user to manage their own threads. +`async-executor` is based on [`async-task`][async-task], a core piece of [`smol`][smol]. ## Usage @@ -40,4 +40,6 @@ To enable `no_std` support in this crate, you will need to disable default featu [bevy]: https://bevy.org [rayon]: https://github.com/rayon-rs/rayon -[async-executor]: https://github.com/stjepang/async-executor +[async-executor]: https://github.com/smol-rs/async-executor +[smol]: https://github.com/smol-rs/smol +[async-task]: https://github.com/smol-rs/async-task diff --git a/crates/bevy_tasks/src/bevy_executor.rs b/crates/bevy_tasks/src/bevy_executor.rs new file mode 100644 index 0000000000000..0be228ec2e09c --- /dev/null +++ b/crates/bevy_tasks/src/bevy_executor.rs @@ -0,0 +1,1294 @@ +//! Fork of `async_executor`. +//! +//! It has been vendored along with its tests to update several outdated dependencies. +//! +//! [`async_executor`]: https://github.com/smol-rs/async-executor + +#![expect( + unsafe_code, + reason = "Executor code requires unsafe code for dealing with non-'static lifetimes" +)] +#![allow( + dead_code, + reason = "Not all functions are used with every feature combination" +)] + +use core::marker::PhantomData; +use core::panic::{RefUnwindSafe, UnwindSafe}; +use core::pin::Pin; +use core::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; +use core::task::{Context, Poll, Waker}; +use core::cell::UnsafeCell; +use core::mem; +use std::thread::{AccessError, ThreadId}; + +use alloc::collections::VecDeque; +use alloc::fmt; +use async_task::{Builder, Runnable, Task}; +use bevy_platform::prelude::Vec; +use bevy_platform::sync::{Arc, Mutex, MutexGuard, PoisonError, RwLock, TryLockError}; +use concurrent_queue::ConcurrentQueue; +use futures_lite::{future,FutureExt}; +use slab::Slab; +use thread_local::ThreadLocal; +use crossbeam_utils::CachePadded; + +// ThreadLocalState *must* stay `Sync` due to a currently existing soundness hole. +// See: https://github.com/Amanieu/thread_local-rs/issues/75 +static THREAD_LOCAL_STATE: ThreadLocal = ThreadLocal::new(); + +pub(crate) fn install_runtime_into_current_thread(executor: &Executor) { + // Use LOCAL_QUEUE here to set the thread destructor + LOCAL_QUEUE.with(|_| { + let tls = THREAD_LOCAL_STATE.get_or_default(); + let state = executor.state_as_arc(); + let state_ptr = Arc::into_raw(state).cast_mut(); + let old_ptr = tls.executor.swap(state_ptr, Ordering::Relaxed); + if !old_ptr.is_null() { + // SAFETY: If this pointer was not null, it was initialized from Arc::into_raw. + drop(unsafe { Arc::from_raw(old_ptr) }); + } + }); +} + +std::thread_local! { + static LOCAL_QUEUE: CachePadded> = const { + CachePadded::new(UnsafeCell::new(LocalQueue { + local_queue: VecDeque::new(), + local_active: Slab::new(), + })) + }; +} + +/// # Safety +/// This must not be accessed at the same time as `LOCAL_QUEUE` in any way. +#[inline(always)] +unsafe fn try_with_local_queue(f: impl FnOnce(&mut LocalQueue) -> T) -> Result { + LOCAL_QUEUE.try_with(|tls| { + // SAFETY: This value is in thread local storage and thus can only be accessed + // from one thread. The caller guarantees that this function is not used with + // LOCAL_QUEUE in any way. + f(unsafe { &mut *tls.get() }) + }) +} + +struct LocalQueue { + local_queue: VecDeque, + local_active: Slab, +} + +impl Drop for LocalQueue { + fn drop(&mut self) { + for waker in self.local_active.drain() { + waker.wake(); + } + + while self.local_queue.pop_front().is_some() {} + } +} + +struct ThreadLocalState { + executor: AtomicPtr, + stealable_queue: ConcurrentQueue, + thread_locked_queue: ConcurrentQueue, +} + +impl Default for ThreadLocalState { + fn default() -> Self { + Self { + executor: AtomicPtr::new(core::ptr::null_mut()), + stealable_queue: ConcurrentQueue::bounded(512), + thread_locked_queue: ConcurrentQueue::unbounded(), + } + } +} + +/// A task spawner for a specific thread. Must be created by calling [`TaskPool::current_thread_spawner`] +/// from the target thread. +/// +/// [`TaskPool::current_thread_spawner`]: crate::TaskPool::current_thread_spawner +#[derive(Clone, Debug)] +pub struct LocalTaskSpawner<'a> { + thread_id: ThreadId, + target_queue: &'static ConcurrentQueue, + state: Arc, + _marker: PhantomData<&'a ()>, +} + +impl<'a> LocalTaskSpawner<'a> { + /// Spawns a task onto the specific target thread. + pub fn spawn( + &self, + future: impl Future + Send + 'static, + ) -> Task { + // SAFETY: T and `future` are both 'static, so the Task is guaranteed to not outlive it. + unsafe { self.spawn_scoped(future) } + } + + /// Spawns a task onto the executor. + /// + /// # Safety + /// The caller must ensure that the returned Task does not outlive 'a. + pub unsafe fn spawn_scoped( + &self, + future: impl Future + Send + 'a, + ) -> Task { + let mut active = self.state.active(); + + // Remove the task from the set of active tasks when the future finishes. + let entry = active.vacant_entry(); + let index = entry.key(); + let state = self.state.clone(); + let future = AsyncCallOnDrop::new(future, move || drop(state.active().try_remove(index))); + + // Create the task and register it in the set of active tasks. + // + // SAFETY: + // + // - `future` is `Send`. Therefore we do not need to worry about what thread + // the produced `Runnable` is used and dropped from. + // - `future` is not `'static`, but the caller must make sure that the Task + // and thus the `Runnable` will not outlive `'a`. + // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = unsafe { + Builder::new() + .propagate_panic(true) + .spawn_unchecked(|()| future, self.schedule()) + }; + entry.insert(runnable.waker()); + + // Instead of directly scheduling this task, it's put into the onto the + // thread locked queue to be moved to the target thread, where it will + // either be run immediately or flushed into the thread's local queue. + let result = self.target_queue.push(runnable); + debug_assert!(result.is_ok()); + task + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { + let thread_id = self.thread_id; + let state = self.state.clone(); + + move |runnable| { + // SAFETY: This value is in thread local storage and thus can only be accessed + // from one thread. There are no instances where the value is accessed mutably + // from multiple locations simultaneously. + if unsafe { try_with_local_queue(|tls| tls.local_queue.push_back(runnable)) }.is_ok() { + state.notify_specific_thread(thread_id, false); + } + } + } +} + +/// An async executor. +pub struct Executor<'a> { + /// The executor state. + state: AtomicPtr, + + /// Makes the `'a` lifetime invariant. + _marker: PhantomData<&'a ()>, +} + +impl UnwindSafe for Executor<'_> {} +impl RefUnwindSafe for Executor<'_> {} + +impl fmt::Debug for Executor<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_executor(self, "Executor", f) + } +} + +impl<'a> Executor<'a> { + /// Creates a new executor. + pub const fn new() -> Executor<'a> { + Executor { + state: AtomicPtr::new(core::ptr::null_mut()), + _marker: PhantomData, + } + } + + /// Spawns a task onto the executor. + pub fn spawn(&self, future: impl Future + Send + 'a) -> Task { + let state = self.state(); + let mut active = state.active(); + + // Remove the task from the set of active tasks when the future finishes. + let entry = active.vacant_entry(); + let index = entry.key(); + let state_arc = self.state_as_arc(); + let future = AsyncCallOnDrop::new(future, move || drop(state_arc.active().try_remove(index))); + + // Create the task and register it in the set of active tasks. + // + // SAFETY: + // + // - `future` is `Send`. Therefore we do not need to worry about what thread + // the produced `Runnable` is used and dropped from. + // - `future` is not `'static`, but we make sure that the `Runnable` does + // not outlive `'a`. When the executor is dropped, the `active` field is + // drained and all of the `Waker`s are woken. Then, the queue inside of + // the `Executor` is drained of all of its runnables. This ensures that + // runnables are dropped and this precondition is satisfied. + // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = unsafe { + Builder::new() + .propagate_panic(true) + .spawn_unchecked(|()| future, self.schedule()) + }; + entry.insert(runnable.waker()); + + // `Runnable::schedule ` has a extra extraneous Waker clone/drop if the schedule captures + // variables, so directly schedule here instead. + Self::schedule_runnable(state, runnable); + task + } + + /// Spawns a non-Send task onto the executor. + pub fn spawn_local(&self, future: impl Future + 'static) -> Task { + // SAFETY: future is 'static + unsafe { self.spawn_local_scoped(future) } + } + + /// Spawns a non-'static and non-Send task onto the executor. + /// + /// # Panics + /// - This function will panic if any of the internal allocations causes an OOM (out of memory) error. + /// - This function will panic if the current thread's destructor has already been called. + /// + /// # Safety + /// The caller must ensure that the returned Task does not outlive 'a. + pub unsafe fn spawn_local_scoped( + &self, + future: impl Future + 'a, + ) -> Task { + // Remove the task from the set of active tasks when the future finishes. + // + // SAFETY: There are no instances where the value is accessed mutably + // from multiple locations simultaneously. + unsafe { + try_with_local_queue(|tls| { + let entry = tls.local_active.vacant_entry(); + let index = entry.key(); + let builder = Builder::new().propagate_panic(true); + + // SAFETY: There are no instances where the value is accessed mutably + // from multiple locations simultaneously. This AsyncCallOnDrop will be + // invoked after the surrounding scope has exited in either a + // `try_tick_local` or `run` call. + let future = AsyncCallOnDrop::new(future, move || { + try_with_local_queue(|tls| drop(tls.local_active.try_remove(index))).ok(); + }); + + // This is a critical section which will result in UB by aliasing active + // if the AsyncCallOnDrop is called while still in this function. + // + // To avoid this, this guard will abort the process if it does + // panic. Rust's drop order will ensure that this will run before + // executor, and thus before the above AsyncCallOnDrop is dropped. + let _panic_guard = AbortOnPanic; + + // Create the task and register it in the set of active tasks. + // + // SAFETY: + // + // - `future` is not `Send`, but the produced `Runnable` does is bound + // to thread-local storage and thus cannot leave this thread of execution. + // - `future` may not be `'static`, but the caller is required to ensure that + // the future does not outlive the borrowed non-metadata variables of the + // task. + // - `self.schedule_local()` is not `Send` or `Sync` so all instances + // must not leave the current thread of execution, and it does not + // all of them are bound vy use of thread-local storage. + // - `self.schedule_local()` is `'static`, as checked below. + let (runnable, task) = builder + .spawn_unchecked(|()| future, self.schedule_local()); + entry.insert(runnable.waker()); + + mem::forget(_panic_guard); + + // Runnable::schedule has a extra extraneous Waker clone/drop if the schedule captures + // variables, so directly schedule here instead. + Self::schedule_runnable_local(self.state(), tls, runnable); + + task + }).unwrap() + } + } + + pub fn current_thread_spawner(&self) -> LocalTaskSpawner<'a> { + LocalTaskSpawner { + thread_id: std::thread::current().id(), + target_queue: &THREAD_LOCAL_STATE.get_or_default().thread_locked_queue, + state: self.state_as_arc(), + _marker: PhantomData, + } + } + + pub fn try_tick_local() -> bool { + // SAFETY: There are no instances where the value is accessed mutably + // from multiple locations simultaneously. As the Runnable is run after + // this scope closes, the AsyncCallOnDrop around the future will be invoked + // without overlapping mutable accssses. + unsafe { try_with_local_queue(|tls| tls.local_queue.pop_front()) } + .ok() + .flatten() + .map(Runnable::run) + .is_some() + } + + /// Runs the executor until the given future completes. + pub fn run<'b, T>(&'b self, future: impl Future + 'b) -> impl Future + 'b { + let mut runner = Runner::new(self.state()); + + // A future that runs tasks forever. + let run_forever = async move { + let mut rng = fastrand::Rng::new(); + loop { + for _ in 0..200 { + let runnable = runner.runnable(&mut rng).await; + runnable.run(); + } + future::yield_now().await; + } + }; + + // Run `future` and `run_forever` concurrently until `future` completes. + future.or(run_forever) + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { + let state = self.state_as_arc(); + + move |runnable| { + Self::schedule_runnable(&state, runnable); + } + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule_local(&self) -> impl Fn(Runnable) + 'static { + let state = self.state_as_arc(); + move |runnable| { + // SAFETY: This value is in thread local storage and thus can only be accessed + // from one thread. There are no instances where the value is accessed mutably + // from multiple locations simultaneously. + unsafe { + // If this returns Err, the thread's destructor has been called and thus it's meaningless + // to push onto the queue. + let _ = try_with_local_queue(|tls| { + Self::schedule_runnable_local(&state, tls, runnable); + }); + } + } + } + + #[inline] + fn schedule_runnable(state: &State, runnable: Runnable) { + // Attempt to push onto the local queue first in dedicated executor threads, + // because we know that this thread is awake and always processing new tasks. + let runnable = if let Some(local_state) = THREAD_LOCAL_STATE.get() { + if core::ptr::eq(local_state.executor.load(Ordering::Relaxed), state) { + match local_state.stealable_queue.push(runnable) { + Ok(()) => { + state.notify_specific_thread(std::thread::current().id(), true); + return; + } + Err(r) => r.into_inner(), + } + } else { + runnable + } + } else { + runnable + }; + // Otherwise push onto the global queue instead. + let result = state.queue.push(runnable); + debug_assert!(result.is_ok()); + state.notify(); + } + + #[inline] + fn schedule_runnable_local(state: &State, tls: &mut LocalQueue, runnable: Runnable) { + tls.local_queue.push_back(runnable); + state.notify_specific_thread(std::thread::current().id(), false); + } + + /// Returns a pointer to the inner state. + #[inline] + fn state_ptr(&self) -> *const State { + #[cold] + fn alloc_state(atomic_ptr: &AtomicPtr) -> *mut State { + let state = Arc::new(State::new()); + let ptr = Arc::into_raw(state).cast_mut(); + if let Err(actual) = atomic_ptr.compare_exchange( + core::ptr::null_mut(), + ptr, + Ordering::AcqRel, + Ordering::Acquire, + ) { + // SAFETY: This was just created from Arc::into_raw. + drop(unsafe { Arc::from_raw(ptr) }); + actual + } else { + ptr + } + } + + let mut ptr = self.state.load(Ordering::Acquire); + if ptr.is_null() { + ptr = alloc_state(&self.state); + } + ptr + } + + /// Returns a reference to the inner state. + #[inline] + fn state(&self) -> &State { + // SAFETY: So long as an Executor lives, it's state pointer will always be valid + // when accessed through state_ptr. + unsafe { &*self.state_ptr() } + } + + // Clones the inner state Arc + #[inline] + fn state_as_arc(&self) -> Arc { + // SAFETY: So long as an Executor lives, it's state pointer will always be a valid + // Arc when accessed through state_ptr. + let arc = unsafe { Arc::from_raw(self.state_ptr()) }; + let clone = arc.clone(); + mem::forget(arc); + clone + } +} + +impl Drop for Executor<'_> { + fn drop(&mut self) { + let ptr = *self.state.get_mut(); + if ptr.is_null() { + return; + } + + // SAFETY: As ptr is not null, it was allocated via Arc::new and converted + // via Arc::into_raw in state_ptr. + let state = unsafe { Arc::from_raw(ptr) }; + + let mut active = state.active(); + for w in active.drain() { + w.wake(); + } + drop(active); + + while state.queue.pop().is_ok() {} + } +} + +/// The state of a executor. +struct State { + /// The global queue. + queue: ConcurrentQueue, + + /// Local queues created by runners. + stealer_queues: RwLock>>, + + /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. + notified: AtomicBool, + + /// A list of sleeping tickers. + sleepers: Mutex, + + /// Currently active tasks. + active: Mutex>, +} + +impl State { + /// Creates state for a new executor. + const fn new() -> State { + State { + queue: ConcurrentQueue::unbounded(), + stealer_queues: RwLock::new(Vec::new()), + notified: AtomicBool::new(true), + sleepers: Mutex::new(Sleepers { + count: 0, + wakers: Vec::new(), + free_ids: Vec::new(), + }), + active: Mutex::new(Slab::new()), + } + } + + /// Returns a reference to currently active tasks. + fn active(&self) -> MutexGuard<'_, Slab> { + self.active.lock().unwrap_or_else(PoisonError::into_inner) + } + + /// Notifies a sleeping ticker. + #[inline] + fn notify(&self) { + if self + .notified + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + let waker = self.sleepers.lock().unwrap_or_else(PoisonError::into_inner).notify(); + if let Some(w) = waker { + w.wake(); + } + } + } + + /// Notifies a sleeping ticker. + #[inline] + fn notify_specific_thread(&self, thread_id: ThreadId, allow_stealing: bool) { + let mut sleepers = self.sleepers.lock().unwrap_or_else(PoisonError::into_inner); + let mut waker = sleepers.notify_specific_thread(thread_id); + if waker.is_none() + && allow_stealing + && self + .notified + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + waker = sleepers.notify(); + } + if let Some(w) = waker { + w.wake(); + } + } +} + +impl fmt::Debug for State { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_state(self, "State", f) + } +} + +/// A sleeping ticker +struct Sleeper { + id: usize, + thread_id: ThreadId, + waker: Waker, +} + +/// A list of sleeping tickers. +struct Sleepers { + /// Number of sleeping tickers (both notified and unnotified). + count: usize, + + /// IDs and wakers of sleeping unnotified tickers. + /// + /// A sleeping ticker is notified when its waker is missing from this list. + wakers: Vec, + + /// Reclaimed IDs. + free_ids: Vec, +} + +impl Sleepers { + /// Inserts a new sleeping ticker. + fn insert(&mut self, waker: &Waker) -> usize { + let id = self.free_ids.pop().unwrap_or_else(|| self.count + 1); + self.count += 1; + self.wakers.push(Sleeper { + id, + thread_id: std::thread::current().id(), + waker: waker.clone() + }); + id + } + + /// Re-inserts a sleeping ticker's waker if it was notified. + /// + /// Returns `true` if the ticker was notified. + fn update(&mut self, id: usize, waker: &Waker) -> bool { + for item in &mut self.wakers { + if item.id == id { + item.waker.clone_from(waker); + return false; + } + } + + self.wakers.push(Sleeper { + id, + thread_id: std::thread::current().id(), + waker: waker.clone() + }); + true + } + + /// Removes a previously inserted sleeping ticker. + /// + /// Returns `true` if the ticker was notified. + fn remove(&mut self, id: usize) -> bool { + self.count -= 1; + self.free_ids.push(id); + + for i in (0..self.wakers.len()).rev() { + if self.wakers[i].id == id { + self.wakers.remove(i); + return false; + } + } + true + } + + /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping. + fn is_notified(&self) -> bool { + self.count == 0 || self.count > self.wakers.len() + } + + /// Returns notification waker for a sleeping ticker. + /// + /// If a ticker was notified already or there are no tickers, `None` will be returned. + fn notify(&mut self) -> Option { + if self.wakers.len() == self.count { + self.wakers.pop().map(|item| item.waker) + } else { + None + } + } + + /// Returns notification waker for a sleeping ticker. + /// + /// If a ticker was notified already or there are no tickers, `None` will be returned. + fn notify_specific_thread(&mut self, thread_id: ThreadId) -> Option { + for i in 0..self.wakers.len() { + if self.wakers[i].thread_id == thread_id { + let sleeper = self.wakers.remove(i); + return Some(sleeper.waker); + } + } + None + } +} + +/// Runs task one by one. +struct Ticker<'a> { + /// The executor state. + state: &'a State, + + /// Set to a non-zero sleeper ID when in sleeping state. + /// + /// States a ticker can be in: + /// 1) Woken. + /// 2a) Sleeping and unnotified. + /// 2b) Sleeping and notified. + sleeping: usize, +} + +impl Ticker<'_> { + /// Creates a ticker. + fn new(state: &State) -> Ticker<'_> { + Ticker { state, sleeping: 0 } + } + + /// Moves the ticker into sleeping and unnotified state. + /// + /// Returns `false` if the ticker was already sleeping and unnotified. + fn sleep(&mut self, waker: &Waker) -> bool { + let mut sleepers = self.state.sleepers.lock().unwrap_or_else(PoisonError::into_inner); + + match self.sleeping { + // Move to sleeping state. + 0 => { + self.sleeping = sleepers.insert(waker); + } + + // Already sleeping, check if notified. + id => { + if !sleepers.update(id, waker) { + return false; + } + } + } + + self.state + .notified + .store(sleepers.is_notified(), Ordering::Release); + + true + } + + /// Moves the ticker into woken state. + fn wake(&mut self) { + if self.sleeping != 0 { + let mut sleepers = self.state.sleepers.lock().unwrap_or_else(PoisonError::into_inner); + sleepers.remove(self.sleeping); + + self.state + .notified + .store(sleepers.is_notified(), Ordering::Release); + } + self.sleeping = 0; + } + + /// Waits for the next runnable task to run, given a function that searches for a task. + /// + /// # Safety + /// Caller must not access `LOCAL_QUEUE` either directly or with `try_with_local_queue` in any way inside `search`. + unsafe fn runnable_with(&mut self, mut search: impl FnMut(&mut LocalQueue) -> Option) -> impl Future { + future::poll_fn(move |cx| { + // SAFETY: Caller must ensure that there's no instances where LOCAL_QUEUE is accessed mutably + // from multiple locations simultaneously. + unsafe { + try_with_local_queue(|tls| { + loop { + match search(tls) { + None => { + // Move to sleeping and unnotified state. + if !self.sleep(cx.waker()) { + // If already sleeping and unnotified, return. + return Poll::Pending; + } + } + Some(r) => { + // Wake up. + self.wake(); + + // Notify another ticker now to pick up where this ticker left off, just in + // case running the task takes a long time. + self.state.notify(); + + return Poll::Ready(r); + } + } + } + }).unwrap_or(Poll::Pending) + } + }) + } +} + +impl Drop for Ticker<'_> { + fn drop(&mut self) { + // If this ticker is in sleeping state, it must be removed from the sleepers list. + if self.sleeping != 0 { + let mut sleepers = self.state.sleepers.lock().unwrap_or_else(PoisonError::into_inner); + let notified = sleepers.remove(self.sleeping); + + self.state + .notified + .store(sleepers.is_notified(), Ordering::Release); + + // If this ticker was notified, then notify another ticker. + if notified { + drop(sleepers); + self.state.notify(); + } + } + } +} + +/// A worker in a work-stealing executor. +/// +/// This is just a ticker that also has an associated local queue for improved cache locality. +struct Runner<'a> { + /// The executor state. + state: &'a State, + + /// Inner ticker. + ticker: Ticker<'a>, + + /// Bumped every time a runnable task is found. + ticks: usize, + + // The thread local state of the executor for the current thread. + local_state: &'a ThreadLocalState, +} + +impl Runner<'_> { + /// Creates a runner and registers it in the executor state. + fn new(state: &State) -> Runner<'_> { + let local_state = THREAD_LOCAL_STATE.get_or_default(); + let runner = Runner { + state, + ticker: Ticker::new(state), + ticks: 0, + local_state, + }; + state + .stealer_queues + .write() + .unwrap_or_else(PoisonError::into_inner) + .push(&local_state.stealable_queue); + runner + } + + /// Waits for the next runnable task to run. + fn runnable(&mut self, _rng: &mut fastrand::Rng) -> impl Future { + // SAFETY: The provided search function does not access LOCAL_QUEUE in any way, and thus cannot + // alias. + let runnable = unsafe { + self + .ticker + .runnable_with(|tls| { + if let Some(r) = tls.local_queue.pop_back() { + return Some(r); + } + + crate::cfg::multi_threaded! { + // Try the local queue. + if let Ok(r) = self.local_state.stealable_queue.pop() { + return Some(r); + } + + // Try stealing from the global queue. + if let Ok(r) = self.state.queue.pop() { + steal(&self.state.queue, &self.local_state.stealable_queue); + return Some(r); + } + + // Try stealing from other runners. + if let Ok(stealer_queues) = self.state.stealer_queues.try_read() { + // Pick a random starting point in the iterator list and rotate the list. + let n = stealer_queues.len(); + let start = _rng.usize(..n); + let iter = stealer_queues + .iter() + .chain(stealer_queues.iter()) + .skip(start) + .take(n); + + // Remove this runner's local queue. + let iter = + iter.filter(|local| !core::ptr::eq(**local, &self.local_state.stealable_queue)); + + // Try stealing from each local queue in the list. + for local in iter { + steal(*local, &self.local_state.stealable_queue); + if let Ok(r) = self.local_state.stealable_queue.pop() { + return Some(r); + } + } + } + + if let Ok(r) = self.local_state.thread_locked_queue.pop() { + // Do not steal from this queue. If other threads steal + // from this current thread, the task will be moved. + // + // Instead, flush all queued tasks into the local queue to + // minimize the effort required to scan for these tasks. + flush_to_local(&self.local_state.thread_locked_queue, tls); + return Some(r); + } + } + + None + }) + }; + + // Bump the tick counter. + self.ticks = self.ticks.wrapping_add(1); + + if self.ticks.is_multiple_of(64) { + // Steal tasks from the global queue to ensure fair task scheduling. + steal(&self.state.queue, &self.local_state.stealable_queue); + } + + runnable + } +} + +impl Drop for Runner<'_> { + fn drop(&mut self) { + // Remove the local queue. + { + let mut stealer_queues = self.state.stealer_queues.write().unwrap(); + if let Some((idx, _)) = stealer_queues + .iter() + .enumerate() + .rev() + .find(|(_, local)| core::ptr::eq(**local, &self.local_state.stealable_queue)) + { + stealer_queues.remove(idx); + } + } + + // Re-schedule remaining tasks in the local queue. + while let Ok(r) = self.local_state.stealable_queue.pop() { + r.schedule(); + } + } +} + +/// Steals some items from one queue into another. +fn steal(src: &ConcurrentQueue, dest: &ConcurrentQueue) { + // Half of `src`'s length rounded up. + let mut count = src.len(); + + if count > 0 { + if let Some(capacity) = dest.capacity() { + // Don't steal more than fits into the queue. + count = count.min(capacity- dest.len()); + } + + // Steal tasks. + for _ in 0..count { + let Ok(val) = src.pop() else { break }; + assert!(dest.push(val).is_ok()); + } + } +} + +fn flush_to_local(src: &ConcurrentQueue, dst: &mut LocalQueue) { + let count = src.len(); + + if count > 0 { + // Steal tasks. + for _ in 0..count { + let Ok(val) = src.pop() else { break }; + dst.local_queue.push_front(val); + } + } +} + +/// Debug implementation for `Executor`. +fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // Get a reference to the state. + let ptr = executor.state.load(Ordering::Acquire); + if ptr.is_null() { + // The executor has not been initialized. + struct Uninitialized; + + impl fmt::Debug for Uninitialized { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") + } + } + + return f.debug_tuple(name).field(&Uninitialized).finish(); + } + + // SAFETY: If the state pointer is not null, it must have been + // allocated properly by Arc::new and converted via Arc::into_raw + // in state_ptr. + let state = unsafe { &*ptr }; + + debug_state(state, name, f) +} + +/// Debug implementation for `Executor`. +fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { + /// Debug wrapper for the number of active tasks. + struct ActiveTasks<'a>(&'a Mutex>); + + impl fmt::Debug for ActiveTasks<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.try_lock() { + Ok(lock) => fmt::Debug::fmt(&lock.len(), f), + Err(TryLockError::WouldBlock) => f.write_str(""), + Err(TryLockError::Poisoned(err)) => fmt::Debug::fmt(&err.into_inner().len(), f), + } + } + } + + /// Debug wrapper for the local runners. + struct LocalRunners<'a>(&'a RwLock>>); + + impl fmt::Debug for LocalRunners<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.try_read() { + Ok(lock) => f + .debug_list() + .entries(lock.iter().map(|queue| queue.len())) + .finish(), + Err(TryLockError::WouldBlock) => f.write_str(""), + Err(TryLockError::Poisoned(_)) => f.write_str(""), + } + } + } + + /// Debug wrapper for the sleepers. + struct SleepCount<'a>(&'a Mutex); + + impl fmt::Debug for SleepCount<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.try_lock() { + Ok(lock) => fmt::Debug::fmt(&lock.count, f), + Err(TryLockError::WouldBlock) => f.write_str(""), + Err(TryLockError::Poisoned(_)) => f.write_str(""), + } + } + } + + f.debug_struct(name) + .field("active", &ActiveTasks(&state.active)) + .field("global_tasks", &state.queue.len()) + .field("stealer_queues", &LocalRunners(&state.stealer_queues)) + .field("sleepers", &SleepCount(&state.sleepers)) + .finish() +} + +struct AbortOnPanic; + +impl Drop for AbortOnPanic { + fn drop(&mut self) { + // Panicking while unwinding will force an abort. + panic!("Aborting due to allocator error"); + } +} + +/// Runs a closure when dropped. +struct CallOnDrop(F); + +impl Drop for CallOnDrop { + fn drop(&mut self) { + (self.0)(); + } +} + +pin_project_lite::pin_project! { + /// A wrapper around a future, running a closure when dropped. + struct AsyncCallOnDrop { + #[pin] + future: Fut, + cleanup: CallOnDrop, + } +} + +impl AsyncCallOnDrop { + fn new(future: Fut, cleanup: Cleanup) -> Self { + Self { + future, + cleanup: CallOnDrop(cleanup), + } + } +} + +impl Future for AsyncCallOnDrop { + type Output = Fut::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().future.poll(cx) + } +} + +#[cfg(test)] +mod test { + use super::*; + use super::THREAD_LOCAL_STATE; + use alloc::{string::String, boxed::Box}; + use futures_lite::{future, pin}; + use std::panic::catch_unwind; + use core::sync::atomic::{AtomicUsize, Ordering}; + use bevy_platform::sync::Mutex; + use core::task::{Poll, Waker}; + use async_task::Task; + use core::time::Duration; + + fn _ensure_send_and_sync() { + fn is_send(_: T) {} + fn is_sync(_: T) {} + fn is_static(_: T) {} + + is_send::>(Executor::new()); + is_sync::>(Executor::new()); + + let ex = Executor::new(); + is_send(ex.schedule()); + is_sync(ex.schedule()); + is_static(ex.schedule()); + is_send(ex.current_thread_spawner()); + is_sync(ex.current_thread_spawner()); + is_send(THREAD_LOCAL_STATE.get_or_default()); + is_sync(THREAD_LOCAL_STATE.get_or_default()); + } + + #[test] + fn executor_cancels_everything() { + static DROP: AtomicUsize = AtomicUsize::new(0); + static WAKER: Mutex> = Mutex::new(None); + + let ex = Executor::new(); + + let task = ex.spawn(async { + let _guard = CallOnDrop(|| { + DROP.fetch_add(1, Ordering::SeqCst); + }); + + future::poll_fn(|cx| { + *WAKER.lock().unwrap() = Some(cx.waker().clone()); + Poll::Pending::<()> + }) + .await; + }); + + future::block_on(ex.run(async { + for _ in 0..10 { + future::yield_now().await; + } + })); + + assert!(WAKER.lock().unwrap().is_some()); + assert_eq!(DROP.load(Ordering::SeqCst), 0); + + drop(ex); + assert_eq!(DROP.load(Ordering::SeqCst), 1); + + assert!(catch_unwind(|| future::block_on(task)).is_err()); + assert_eq!(DROP.load(Ordering::SeqCst), 1); + } + + #[test] + fn await_task_after_dropping_executor() { + let s: String = "hello".into(); + + let ex = Executor::new(); + let task: Task<&str> = ex.spawn(async { &*s }); + future::block_on(ex.run(async { + for _ in 0..10 { + future::yield_now().await; + } + })); + + + drop(ex); + assert_eq!(future::block_on(task), "hello"); + drop(s); + } + + #[test] + fn drop_executor_and_then_drop_finished_task() { + static DROP: AtomicUsize = AtomicUsize::new(0); + + let ex = Executor::new(); + let task = ex.spawn(async { + CallOnDrop(|| { + DROP.fetch_add(1, Ordering::SeqCst); + }) + }); + future::block_on(ex.run(async { + for _ in 0..10 { + future::yield_now().await; + } + })); + + assert_eq!(DROP.load(Ordering::SeqCst), 0); + drop(ex); + assert_eq!(DROP.load(Ordering::SeqCst), 0); + drop(task); + assert_eq!(DROP.load(Ordering::SeqCst), 1); + } + + #[test] + fn drop_finished_task_and_then_drop_executor() { + static DROP: AtomicUsize = AtomicUsize::new(0); + + let ex = Executor::new(); + let task = ex.spawn(async { + CallOnDrop(|| { + DROP.fetch_add(1, Ordering::SeqCst); + }) + }); + future::block_on(ex.run(async { + for _ in 0..10 { + future::yield_now().await; + } + })); + + assert_eq!(DROP.load(Ordering::SeqCst), 0); + drop(task); + assert_eq!(DROP.load(Ordering::SeqCst), 1); + drop(ex); + assert_eq!(DROP.load(Ordering::SeqCst), 1); + } + + fn do_run>(mut f: impl FnMut(Arc>) -> Fut) { + // This should not run for longer than two minutes. + #[cfg(not(miri))] + let _stop_timeout = { + let (stop_timeout, stopper) = async_channel::bounded::<()>(1); + std::thread::spawn(move || { + future::block_on(async move { + #[expect(clippy::print_stderr, reason = "Explicitly used to warn about timed out tests")] + let timeout = async { + async_io::Timer::after(Duration::from_secs(2 * 60)).await; + std::eprintln!("test timed out after 2m"); + std::process::exit(1) + }; + + let _ = stopper.recv().or(timeout).await; + }); + }); + stop_timeout + }; + + let ex = Arc::new(Executor::new()); + + // Test 1: Use the `run` command. + future::block_on(ex.run(f(ex.clone()))); + + // Test 2: Run on many threads. + std::thread::scope(|scope| { + let (_signal, shutdown) = async_channel::bounded::<()>(1); + + for _ in 0..16 { + let shutdown = shutdown.clone(); + let ex = &ex; + scope.spawn(move || future::block_on(ex.run(shutdown.recv()))); + } + + future::block_on(f(ex.clone())); + }); + } + + #[test] + fn smoke() { + do_run(|ex| async move { ex.spawn(async {}).await }); + } + + #[test] + fn yield_now() { + do_run(|ex| async move { ex.spawn(future::yield_now()).await }); + } + + #[test] + fn timer() { + do_run(|ex| async move { + ex.spawn(async_io::Timer::after(Duration::from_millis(5))) + .await; + }); + } + + #[test] + fn test_panic_propagation() { + let ex = Executor::new(); + let task = ex.spawn(async { panic!("should be caught by the task") }); + + // Running the executor should not panic. + future::block_on(ex.run(async { + for _ in 0..10 { + future::yield_now().await; + } + })); + + // Polling the task should. + assert!(future::block_on(task.catch_unwind()).is_err()); + } + + #[test] + fn two_queues() { + future::block_on(async { + // Create an executor with two runners. + let ex = Executor::new(); + let (run1, run2) = ( + ex.run(future::pending::<()>()), + ex.run(future::pending::<()>()), + ); + let mut run1 = Box::pin(run1); + pin!(run2); + + // Poll them both. + assert!(future::poll_once(run1.as_mut()).await.is_none()); + assert!(future::poll_once(run2.as_mut()).await.is_none()); + + // Drop the first one, which should leave the local queue in the `None` state. + drop(run1); + assert!(future::poll_once(run2.as_mut()).await.is_none()); + }); + } +} diff --git a/crates/bevy_tasks/src/edge_executor.rs b/crates/bevy_tasks/src/edge_executor.rs index a8c80725cafe9..efe3d9930c3c5 100644 --- a/crates/bevy_tasks/src/edge_executor.rs +++ b/crates/bevy_tasks/src/edge_executor.rs @@ -1,8 +1,7 @@ -//! Alternative to `async_executor` based on [`edge_executor`] by Ivan Markov. +//! Alternative to `bevy_executor` based on [`edge_executor`] by Ivan Markov. //! //! It has been vendored along with its tests to update several outdated dependencies. //! -//! [`async_executor`]: https://github.com/smol-rs/async-executor //! [`edge_executor`]: https://github.com/ivmarkov/edge-executor #![expect(unsafe_code, reason = "original implementation relies on unsafe")] @@ -13,7 +12,6 @@ // TODO: Create a more tailored replacement, possibly integrating [Fotre](https://github.com/NthTensor/Forte) -use alloc::rc::Rc; use core::{ future::{poll_fn, Future}, marker::PhantomData, @@ -97,6 +95,24 @@ impl<'a, const C: usize> Executor<'a, C> { unsafe { self.spawn_unchecked(fut) } } + pub fn spawn_local(&self, fut: F) -> Task + where + F: Future + 'a, + F::Output: 'a, + { + // SAFETY: Original implementation missing safety documentation + unsafe { self.spawn_unchecked(fut) } + } + + pub unsafe fn spawn_local_scoped(&self, fut: F) -> Task + where + F: Future + 'a, + F::Output: 'a, + { + // SAFETY: Original implementation missing safety documentation + unsafe { self.spawn_unchecked(fut) } + } + /// Attempts to run a task if at least one is scheduled. /// /// Running a scheduled task means simply polling its future once. @@ -162,7 +178,7 @@ impl<'a, const C: usize> Executor<'a, C> { /// ``` pub async fn run(&self, fut: F) -> F::Output where - F: Future + Send + 'a, + F: Future + 'a, { // SAFETY: Original implementation missing safety documentation unsafe { self.run_unchecked(fut).await } @@ -201,7 +217,7 @@ impl<'a, const C: usize> Executor<'a, C> { target_has_atomic = "ptr" ))] { - runnable = self.state().queue.pop(); + runnable = self.state().queue.pop().ok(); } #[cfg(not(all( @@ -298,141 +314,6 @@ unsafe impl<'a, const C: usize> Send for Executor<'a, C> {} // SAFETY: Original implementation missing safety documentation unsafe impl<'a, const C: usize> Sync for Executor<'a, C> {} -/// A thread-local executor. -/// -/// The executor can only be run on the thread that created it. -/// -/// # Examples -/// -/// ```ignore -/// use edge_executor::{LocalExecutor, block_on}; -/// -/// let local_ex: LocalExecutor = Default::default(); -/// -/// block_on(local_ex.run(async { -/// println!("Hello world!"); -/// })); -/// ``` -pub struct LocalExecutor<'a, const C: usize = 64> { - executor: Executor<'a, C>, - _not_send: PhantomData>>, -} - -impl<'a, const C: usize> LocalExecutor<'a, C> { - /// Creates a single-threaded executor. - /// - /// # Examples - /// - /// ```ignore - /// use edge_executor::LocalExecutor; - /// - /// let local_ex: LocalExecutor = Default::default(); - /// ``` - pub const fn new() -> Self { - Self { - executor: Executor::::new(), - _not_send: PhantomData, - } - } - - /// Spawns a task onto the executor. - /// - /// # Examples - /// - /// ```ignore - /// use edge_executor::LocalExecutor; - /// - /// let local_ex: LocalExecutor = Default::default(); - /// - /// let task = local_ex.spawn(async { - /// println!("Hello world"); - /// }); - /// ``` - /// - /// Note that if the executor's queue size is equal to the number of currently - /// spawned and running tasks, spawning this additional task might cause the executor to panic - /// later, when the task is scheduled for polling. - pub fn spawn(&self, fut: F) -> Task - where - F: Future + 'a, - F::Output: 'a, - { - // SAFETY: Original implementation missing safety documentation - unsafe { self.executor.spawn_unchecked(fut) } - } - - /// Attempts to run a task if at least one is scheduled. - /// - /// Running a scheduled task means simply polling its future once. - /// - /// # Examples - /// - /// ```ignore - /// use edge_executor::LocalExecutor; - /// - /// let local_ex: LocalExecutor = Default::default(); - /// assert!(!local_ex.try_tick()); // no tasks to run - /// - /// let task = local_ex.spawn(async { - /// println!("Hello world"); - /// }); - /// assert!(local_ex.try_tick()); // a task was found - /// ``` - pub fn try_tick(&self) -> bool { - self.executor.try_tick() - } - - /// Runs a single task asynchronously. - /// - /// Running a task means simply polling its future once. - /// - /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. - /// - /// # Examples - /// - /// ```ignore - /// use edge_executor::{LocalExecutor, block_on}; - /// - /// let local_ex: LocalExecutor = Default::default(); - /// - /// let task = local_ex.spawn(async { - /// println!("Hello world"); - /// }); - /// block_on(local_ex.tick()); // runs the task - /// ``` - pub async fn tick(&self) { - self.executor.tick().await; - } - - /// Runs the executor asynchronously until the given future completes. - /// - /// # Examples - /// - /// ```ignore - /// use edge_executor::{LocalExecutor, block_on}; - /// - /// let local_ex: LocalExecutor = Default::default(); - /// - /// let task = local_ex.spawn(async { 1 + 2 }); - /// let res = block_on(local_ex.run(async { task.await * 2 })); - /// - /// assert_eq!(res, 6); - /// ``` - pub async fn run(&self, fut: F) -> F::Output - where - F: Future, - { - // SAFETY: Original implementation missing safety documentation - unsafe { self.executor.run_unchecked(fut) }.await - } -} - -impl<'a, const C: usize> Default for LocalExecutor<'a, C> { - fn default() -> Self { - Self::new() - } -} - struct State { #[cfg(all( target_has_atomic = "8", @@ -441,7 +322,7 @@ struct State { target_has_atomic = "64", target_has_atomic = "ptr" ))] - queue: crossbeam_queue::ArrayQueue, + queue: concurrent_queue::ConcurrentQueue, #[cfg(not(all( target_has_atomic = "8", target_has_atomic = "16", @@ -463,7 +344,7 @@ impl State { target_has_atomic = "64", target_has_atomic = "ptr" ))] - queue: crossbeam_queue::ArrayQueue::new(C), + queue: concurrent_queue::ConcurrentQueue::bounded(C), #[cfg(not(all( target_has_atomic = "8", target_has_atomic = "16", diff --git a/crates/bevy_tasks/src/executor.rs b/crates/bevy_tasks/src/executor.rs deleted file mode 100644 index fcce0e2985536..0000000000000 --- a/crates/bevy_tasks/src/executor.rs +++ /dev/null @@ -1,86 +0,0 @@ -//! Provides a fundamental executor primitive appropriate for the target platform -//! and feature set selected. -//! By default, the `async_executor` feature will be enabled, which will rely on -//! [`async-executor`] for the underlying implementation. This requires `std`, -//! so is not suitable for `no_std` contexts. Instead, you must use `edge_executor`, -//! which relies on the alternate [`edge-executor`] backend. -//! -//! [`async-executor`]: https://crates.io/crates/async-executor -//! [`edge-executor`]: https://crates.io/crates/edge-executor - -use core::{ - fmt, - panic::{RefUnwindSafe, UnwindSafe}, -}; -use derive_more::{Deref, DerefMut}; - -crate::cfg::async_executor! { - if { - type ExecutorInner<'a> = async_executor::Executor<'a>; - type LocalExecutorInner<'a> = async_executor::LocalExecutor<'a>; - } else { - type ExecutorInner<'a> = crate::edge_executor::Executor<'a, 64>; - type LocalExecutorInner<'a> = crate::edge_executor::LocalExecutor<'a, 64>; - } -} - -crate::cfg::multi_threaded! { - pub use async_task::FallibleTask; -} - -/// Wrapper around a multi-threading-aware async executor. -/// Spawning will generally require tasks to be `Send` and `Sync` to allow multiple -/// threads to send/receive/advance tasks. -/// -/// If you require an executor _without_ the `Send` and `Sync` requirements, consider -/// using [`LocalExecutor`] instead. -#[derive(Deref, DerefMut, Default)] -pub struct Executor<'a>(ExecutorInner<'a>); - -/// Wrapper around a single-threaded async executor. -/// Spawning wont generally require tasks to be `Send` and `Sync`, at the cost of -/// this executor itself not being `Send` or `Sync`. This makes it unsuitable for -/// global statics. -/// -/// If need to store an executor in a global static, or send across threads, -/// consider using [`Executor`] instead. -#[derive(Deref, DerefMut, Default)] -pub struct LocalExecutor<'a>(LocalExecutorInner<'a>); - -impl Executor<'_> { - /// Construct a new [`Executor`] - #[expect(clippy::allow_attributes, reason = "This lint may not always trigger.")] - #[allow(dead_code, reason = "not all feature flags require this function")] - pub const fn new() -> Self { - Self(ExecutorInner::new()) - } -} - -impl LocalExecutor<'_> { - /// Construct a new [`LocalExecutor`] - #[expect(clippy::allow_attributes, reason = "This lint may not always trigger.")] - #[allow(dead_code, reason = "not all feature flags require this function")] - pub const fn new() -> Self { - Self(LocalExecutorInner::new()) - } -} - -impl UnwindSafe for Executor<'_> {} - -impl RefUnwindSafe for Executor<'_> {} - -impl UnwindSafe for LocalExecutor<'_> {} - -impl RefUnwindSafe for LocalExecutor<'_> {} - -impl fmt::Debug for Executor<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Executor").finish() - } -} - -impl fmt::Debug for LocalExecutor<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("LocalExecutor").finish() - } -} diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 7ec4f3748c692..cc0ad2d504370 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -13,9 +13,9 @@ pub mod cfg { pub use bevy_platform::cfg::{alloc, std, web}; define_alias! { - #[cfg(feature = "async_executor")] => { - /// Indicates `async_executor` is used as the future execution backend. - async_executor + #[cfg(feature = "bevy_executor")] => { + /// Indicates `bevy_executor` is used as the future execution backend. + bevy_executor } #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] => { @@ -72,15 +72,16 @@ use alloc::boxed::Box; pub type BoxedFuture<'a, T> = core::pin::Pin + 'a>>; // Modules -mod executor; pub mod futures; mod iter; mod slice; mod task; mod usages; -cfg::async_executor! { - if {} else { +cfg::bevy_executor! { + if { + mod bevy_executor; + } else { mod edge_executor; } } @@ -94,23 +95,15 @@ pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; pub use futures_lite; pub use futures_lite::future::poll_once; -cfg::web! { - if {} else { - pub use usages::tick_global_task_pools_on_main_thread; - } -} - cfg::multi_threaded! { if { mod task_pool; - mod thread_executor; - pub use task_pool::{Scope, TaskPool, TaskPoolBuilder}; - pub use thread_executor::{ThreadExecutor, ThreadExecutorTicker}; + pub use task_pool::{Scope, TaskPool, TaskPoolBuilder, LocalTaskSpawner}; } else { mod single_threaded_task_pool; - pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder, ThreadExecutor}; + pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder, LocalTaskSpawner}; } } diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index d81e43b4e91b9..044a9fdd8f10f 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -1,28 +1,19 @@ -use alloc::{string::String, vec::Vec}; -use bevy_platform::sync::Arc; +use alloc::{string::String, vec::Vec, fmt}; use core::{cell::{RefCell, Cell}, future::Future, marker::PhantomData, mem}; +use crate::futures::now_or_never; -use crate::executor::LocalExecutor; use crate::{block_on, Task}; -crate::cfg::std! { - if { - use std::thread_local; - - use crate::executor::LocalExecutor as Executor; - - thread_local! { - static LOCAL_EXECUTOR: Executor<'static> = const { Executor::new() }; - } +crate::cfg::bevy_executor! { + if { + use crate::bevy_executor::Executor; } else { - - // Because we do not have thread-locals without std, we cannot use LocalExecutor here. - use crate::executor::Executor; - - static LOCAL_EXECUTOR: Executor<'static> = const { Executor::new() }; + use crate::edge_executor::Executor; } } +static EXECUTOR: Executor<'static> = const { Executor::new() }; + /// Used to create a [`TaskPool`]. #[derive(Debug, Default, Clone)] pub struct TaskPoolBuilder {} @@ -31,15 +22,9 @@ pub struct TaskPoolBuilder {} /// task pool. In the case of the multithreaded task pool this struct is used to spawn /// tasks on a specific thread. But the wasm task pool just calls /// `wasm_bindgen_futures::spawn_local` for spawning which just runs tasks on the main thread -/// and so the [`ThreadExecutor`] does nothing. -#[derive(Default)] -pub struct ThreadExecutor<'a>(PhantomData<&'a ()>); -impl<'a> ThreadExecutor<'a> { - /// Creates a new `ThreadExecutor` - pub fn new() -> Self { - Self::default() - } -} +/// and so the [`LocalTaskSpawner`] does nothing. +#[derive(Clone)] +pub struct LocalTaskSpawner<'a>(PhantomData<&'a ()>); impl TaskPoolBuilder { /// Creates a new `TaskPoolBuilder` instance @@ -85,8 +70,8 @@ pub struct TaskPool {} impl TaskPool { /// Just create a new `ThreadExecutor` for wasm - pub fn get_thread_executor() -> Arc> { - Arc::new(ThreadExecutor::new()) + pub fn current_thread_spawner(&self) -> LocalTaskSpawner<'static> { + LocalTaskSpawner(PhantomData) } /// Create a `TaskPool` with the default configuration. @@ -113,7 +98,7 @@ impl TaskPool { F: for<'scope> FnOnce(&'scope mut Scope<'scope, 'env, T>), T: Send + 'static, { - self.scope_with_executor(false, None, f) + self.scope_with_executor(None, f) } /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback, @@ -124,8 +109,7 @@ impl TaskPool { #[expect(unsafe_code, reason = "Required to transmute lifetimes.")] pub fn scope_with_executor<'env, F, T>( &self, - _tick_task_pool_executor: bool, - _thread_executor: Option<&ThreadExecutor>, + _thread_executor: Option, f: F, ) -> Vec where @@ -140,13 +124,13 @@ impl TaskPool { // Any usages of the references passed into `Scope` must be accessed through // the transmuted reference for the rest of this function. - let executor = LocalExecutor::new(); // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let executor_ref: &'env LocalExecutor<'env> = unsafe { mem::transmute(&executor) }; + let executor_ref: &'env Executor<'env> = unsafe { mem::transmute(&EXECUTOR) }; - let results: RefCell>> = RefCell::new(Vec::new()); + // Kept around to ensure that, in the case of an unwinding panic, all scheduled Tasks are cancelled. + let tasks: RefCell>> = RefCell::new(Vec::new()); // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let results_ref: &'env RefCell>> = unsafe { mem::transmute(&results) }; + let tasks_ref: &'env RefCell>> = unsafe { mem::transmute(&tasks) }; let pending_tasks: Cell = Cell::new(0); // SAFETY: As above, all futures must complete in this function so we can change the lifetime @@ -154,8 +138,8 @@ impl TaskPool { let mut scope = Scope { executor_ref, + tasks_ref, pending_tasks, - results_ref, scope: PhantomData, env: PhantomData, }; @@ -166,16 +150,17 @@ impl TaskPool { f(scope_ref); // Wait until the scope is complete - block_on(executor.run(async { + block_on(executor_ref.run(async { while pending_tasks.get() != 0 { futures_lite::future::yield_now().await; } })); - results + tasks .take() .into_iter() - .map(|result| result.unwrap()) + .map(now_or_never) + .map(Option::unwrap) .collect() } @@ -195,20 +180,17 @@ impl TaskPool { crate::cfg::switch! {{ crate::cfg::web => { Task::wrap_future(future) - } - crate::cfg::std => { - LOCAL_EXECUTOR.with(|executor| { - let task = executor.spawn(future); - // Loop until all tasks are done - while executor.try_tick() {} - - Task::new(task) - }) - } + } _ => { - let task = LOCAL_EXECUTOR.spawn(future); + let task = EXECUTOR.spawn_local(future); // Loop until all tasks are done - while LOCAL_EXECUTOR.try_tick() {} + crate::cfg::bevy_executor! { + if { + while !Executor::try_tick_local() {} + } else { + while EXECUTOR.try_tick() {} + } + } Task::new(task) } @@ -225,43 +207,17 @@ impl TaskPool { { self.spawn(future) } - - /// Runs a function with the local executor. Typically used to tick - /// the local executor on the main thread as it needs to share time with - /// other things. - /// - /// ``` - /// use bevy_tasks::TaskPool; - /// - /// TaskPool::new().with_local_executor(|local_executor| { - /// local_executor.try_tick(); - /// }); - /// ``` - pub fn with_local_executor(&self, f: F) -> R - where - F: FnOnce(&Executor) -> R, - { - crate::cfg::switch! {{ - crate::cfg::std => { - LOCAL_EXECUTOR.with(f) - } - _ => { - f(&LOCAL_EXECUTOR) - } - }} - } } /// A `TaskPool` scope for running one or more non-`'static` futures. /// /// For more information, see [`TaskPool::scope`]. -#[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { - executor_ref: &'scope LocalExecutor<'scope>, + executor_ref: &'scope Executor<'scope>, // The number of pending tasks spawned on the scope pending_tasks: &'scope Cell, // Vector to gather results of all futures spawned during scope run - results_ref: &'env RefCell>>, + tasks_ref: &'env RefCell>>, // make `Scope` invariant over 'scope and 'env scope: PhantomData<&'scope mut &'scope ()>, @@ -301,28 +257,33 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { let pending_tasks = self.pending_tasks; pending_tasks.update(|i| i + 1); - // add a spot to keep the result, and record the index - let results_ref = self.results_ref; - let mut results = results_ref.borrow_mut(); - let task_number = results.len(); - results.push(None); - drop(results); - // create the job closure let f = async move { let result = f.await; - // store the result in the allocated slot - let mut results = results_ref.borrow_mut(); - results[task_number] = Some(result); - drop(results); - // decrement the pending tasks count pending_tasks.update(|i| i - 1); + + result }; - // spawn the job itself - self.executor_ref.spawn(f).detach(); + let mut tasks = self.tasks_ref.borrow_mut(); + + #[expect(unsafe_code, reason = "Executor::spawn_local_scoped is unsafe")] + // SAFETY: The surrounding scope will not terminate until all local tasks are done + // ensuring that the borrowed variables do not outlive the detached task. + tasks.push(unsafe { self.executor_ref.spawn_local_scoped(f) }); + } +} + +impl <'scope, 'env: 'scope, T> fmt::Debug for Scope<'scope, 'env, T> +where T: fmt::Debug +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Scope") + .field("pending_tasks", &self.pending_tasks) + .field("tasks_ref", &self.tasks_ref) + .finish() } } diff --git a/crates/bevy_tasks/src/task.rs b/crates/bevy_tasks/src/task.rs index e70ab1d18b3c8..73b82b80660c9 100644 --- a/crates/bevy_tasks/src/task.rs +++ b/crates/bevy_tasks/src/task.rs @@ -38,7 +38,9 @@ cfg::web! { spawn_local(async move { // Catch any panics that occur when polling the future so they can // be propagated back to the task handle. - let value = CatchUnwind(AssertUnwindSafe(future)).await; + let value = CatchUnwind { + inner: AssertUnwindSafe(future) + }.await; let _ = sender.send(value).await; }); Self(receiver) @@ -173,13 +175,17 @@ cfg::web! { type Panic = Box; - #[pin_project::pin_project] - struct CatchUnwind(#[pin] F); + pin_project_lite::pin_project! { + struct CatchUnwind { + #[pin] + inner: F + } + } impl Future for CatchUnwind { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let f = AssertUnwindSafe(|| self.project().0.poll(cx)); + let f = AssertUnwindSafe(|| self.project().inner.poll(cx)); let result = cfg::std! { if { diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index eb6f8502b3d80..159a28366716e 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -1,20 +1,16 @@ use alloc::{boxed::Box, format, string::String, vec::Vec}; use core::{future::Future, marker::PhantomData, mem, panic::AssertUnwindSafe}; -use std::{ - thread::{self, JoinHandle}, - thread_local, -}; +use std::thread::{self, JoinHandle}; -use crate::executor::FallibleTask; +use crate::bevy_executor::Executor; +use async_task::FallibleTask; use bevy_platform::sync::Arc; use concurrent_queue::ConcurrentQueue; use futures_lite::FutureExt; -use crate::{ - block_on, - thread_executor::{ThreadExecutor, ThreadExecutorTicker}, - Task, -}; +use crate::{block_on, Task}; + +pub use crate::bevy_executor::LocalTaskSpawner; struct CallOnDrop(Option>); @@ -134,7 +130,7 @@ impl TaskPoolBuilder { #[derive(Debug)] pub struct TaskPool { /// The executor for the pool. - executor: Arc>, + executor: Arc>, // The inner state of the pool. threads: Vec>, @@ -142,14 +138,10 @@ pub struct TaskPool { } impl TaskPool { - thread_local! { - static LOCAL_EXECUTOR: crate::executor::LocalExecutor<'static> = const { crate::executor::LocalExecutor::new() }; - static THREAD_EXECUTOR: Arc> = Arc::new(ThreadExecutor::new()); - } - - /// Each thread should only create one `ThreadExecutor`, otherwise, there are good chances they will deadlock - pub fn get_thread_executor() -> Arc> { - Self::THREAD_EXECUTOR.with(Clone::clone) + /// Creates a [`LocalTaskSpawner`] for this current thread of execution. + /// Can be used to spawn new tasks to execute exclusively on this thread. + pub fn current_thread_spawner(&self) -> LocalTaskSpawner<'static> { + self.executor.current_thread_spawner() } /// Create a `TaskPool` with the default configuration. @@ -160,7 +152,7 @@ impl TaskPool { fn new_internal(builder: TaskPoolBuilder) -> Self { let (shutdown_tx, shutdown_rx) = async_channel::unbounded::<()>(); - let executor = Arc::new(crate::executor::Executor::new()); + let executor = Arc::new(Executor::new()); let num_threads = builder .num_threads @@ -187,28 +179,22 @@ impl TaskPool { thread_builder .spawn(move || { - TaskPool::LOCAL_EXECUTOR.with(|local_executor| { - if let Some(on_thread_spawn) = on_thread_spawn { - on_thread_spawn(); - drop(on_thread_spawn); - } - let _destructor = CallOnDrop(on_thread_destroy); - loop { - let res = std::panic::catch_unwind(|| { - let tick_forever = async move { - loop { - local_executor.tick().await; - } - }; - block_on(ex.run(tick_forever.or(shutdown_rx.recv()))) - }); - if let Ok(value) = res { - // Use unwrap_err because we expect a Closed error - value.unwrap_err(); - break; - } + crate::bevy_executor::install_runtime_into_current_thread(&ex); + + if let Some(on_thread_spawn) = on_thread_spawn { + on_thread_spawn(); + drop(on_thread_spawn); + } + let _destructor = CallOnDrop(on_thread_destroy); + loop { + let res = + std::panic::catch_unwind(|| block_on(ex.run(shutdown_rx.recv()))); + if let Ok(value) = res { + // Use unwrap_err because we expect a Closed error + value.unwrap_err(); + break; } - }); + } }) .expect("Failed to spawn thread.") }) @@ -312,57 +298,39 @@ impl TaskPool { F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>), T: Send + 'static, { - Self::THREAD_EXECUTOR.with(|scope_executor| { - self.scope_with_executor_inner(true, scope_executor, scope_executor, f) - }) + let scope_spawner = self.current_thread_spawner(); + self.scope_with_executor_inner(scope_spawner.clone(), scope_spawner, f) } - /// This allows passing an external executor to spawn tasks on. When you pass an external executor - /// [`Scope::spawn_on_scope`] spawns is then run on the thread that [`ThreadExecutor`] is being ticked on. - /// If [`None`] is passed the scope will use a [`ThreadExecutor`] that is ticked on the current thread. - /// - /// When `tick_task_pool_executor` is set to `true`, the multithreaded task stealing executor is ticked on the scope - /// thread. Disabling this can be useful when finishing the scope is latency sensitive. Pulling tasks from - /// global executor can run tasks unrelated to the scope and delay when the scope returns. + /// This allows passing an external [`LocalTaskSpawner`] to spawn tasks to. When you pass an external spawner + /// [`Scope::spawn_on_scope`] spawns is then run on the thread that [`LocalTaskSpawner`] originated from. + /// If [`None`] is passed the scope will use a [`LocalTaskSpawner`] that is ticked on the current thread. /// /// See [`Self::scope`] for more details in general about how scopes work. pub fn scope_with_executor<'env, F, T>( &self, - tick_task_pool_executor: bool, - external_executor: Option<&ThreadExecutor>, + external_spawner: Option>, f: F, ) -> Vec where F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>), T: Send + 'static, { - Self::THREAD_EXECUTOR.with(|scope_executor| { - // If an `external_executor` is passed, use that. Otherwise, get the executor stored - // in the `THREAD_EXECUTOR` thread local. - if let Some(external_executor) = external_executor { - self.scope_with_executor_inner( - tick_task_pool_executor, - external_executor, - scope_executor, - f, - ) - } else { - self.scope_with_executor_inner( - tick_task_pool_executor, - scope_executor, - scope_executor, - f, - ) - } - }) + let scope_spawner = self.executor.current_thread_spawner(); + // If an `external_executor` is passed, use that. Otherwise, get the executor stored + // in the `THREAD_EXECUTOR` thread local. + if let Some(external_spawner) = external_spawner { + self.scope_with_executor_inner(external_spawner, scope_spawner, f) + } else { + self.scope_with_executor_inner(scope_spawner.clone(), scope_spawner, f) + } } #[expect(unsafe_code, reason = "Required to transmute lifetimes.")] fn scope_with_executor_inner<'env, F, T>( &self, - tick_task_pool_executor: bool, - external_executor: &ThreadExecutor, - scope_executor: &ThreadExecutor, + external_spawner: LocalTaskSpawner<'env>, + scope_spawner: LocalTaskSpawner<'env>, f: F, ) -> Vec where @@ -376,26 +344,20 @@ impl TaskPool { // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety. // Any usages of the references passed into `Scope` must be accessed through // the transmuted reference for the rest of this function. - let executor: &crate::executor::Executor = &self.executor; - // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let executor: &'env crate::executor::Executor = unsafe { mem::transmute(executor) }; - // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let external_executor: &'env ThreadExecutor<'env> = - unsafe { mem::transmute(external_executor) }; + let executor: &Executor = &self.executor; // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let scope_executor: &'env ThreadExecutor<'env> = unsafe { mem::transmute(scope_executor) }; + let executor: &'env Executor = unsafe { mem::transmute(executor) }; let spawned: ConcurrentQueue>>> = ConcurrentQueue::unbounded(); // shadow the variable so that the owned value cannot be used for the rest of the function // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let spawned: &'env ConcurrentQueue< - FallibleTask>>, - > = unsafe { mem::transmute(&spawned) }; + let spawned: &'env ConcurrentQueue>>> = + unsafe { mem::transmute(&spawned) }; let scope = Scope { executor, - external_executor, - scope_executor, + external_spawner, + scope_spawner, spawned, scope: PhantomData, env: PhantomData, @@ -410,144 +372,20 @@ impl TaskPool { if spawned.is_empty() { Vec::new() } else { - block_on(async move { - let get_results = async { - let mut results = Vec::with_capacity(spawned.len()); - while let Ok(task) = spawned.pop() { - if let Some(res) = task.await { - match res { - Ok(res) => results.push(res), - Err(payload) => std::panic::resume_unwind(payload), - } - } else { - panic!("Failed to catch panic!"); - } - } - results - }; - - let tick_task_pool_executor = tick_task_pool_executor || self.threads.is_empty(); - - // we get this from a thread local so we should always be on the scope executors thread. - // note: it is possible `scope_executor` and `external_executor` is the same executor, - // in that case, we should only tick one of them, otherwise, it may cause deadlock. - let scope_ticker = scope_executor.ticker().unwrap(); - let external_ticker = if !external_executor.is_same(scope_executor) { - external_executor.ticker() - } else { - None - }; - - match (external_ticker, tick_task_pool_executor) { - (Some(external_ticker), true) => { - Self::execute_global_external_scope( - executor, - external_ticker, - scope_ticker, - get_results, - ) - .await - } - (Some(external_ticker), false) => { - Self::execute_external_scope(external_ticker, scope_ticker, get_results) - .await + block_on(self.executor.run(async move { + let mut results = Vec::with_capacity(spawned.len()); + while let Ok(task) = spawned.pop() { + match task.await { + Some(Ok(res)) => results.push(res), + Some(Err(payload)) => std::panic::resume_unwind(payload), + None => panic!("Failed to catch panic!"), } - // either external_executor is none or it is same as scope_executor - (None, true) => { - Self::execute_global_scope(executor, scope_ticker, get_results).await - } - (None, false) => Self::execute_scope(scope_ticker, get_results).await, } - }) + results + })) } } - #[inline] - async fn execute_global_external_scope<'scope, 'ticker, T>( - executor: &'scope crate::executor::Executor<'scope>, - external_ticker: ThreadExecutorTicker<'scope, 'ticker>, - scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, - get_results: impl Future>, - ) -> Vec { - // we restart the executors if a task errors. if a scoped - // task errors it will panic the scope on the call to get_results - let execute_forever = async move { - loop { - let tick_forever = async { - loop { - external_ticker.tick().or(scope_ticker.tick()).await; - } - }; - // we don't care if it errors. If a scoped task errors it will propagate - // to get_results - let _result = AssertUnwindSafe(executor.run(tick_forever)) - .catch_unwind() - .await - .is_ok(); - } - }; - get_results.or(execute_forever).await - } - - #[inline] - async fn execute_external_scope<'scope, 'ticker, T>( - external_ticker: ThreadExecutorTicker<'scope, 'ticker>, - scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, - get_results: impl Future>, - ) -> Vec { - let execute_forever = async { - loop { - let tick_forever = async { - loop { - external_ticker.tick().or(scope_ticker.tick()).await; - } - }; - let _result = AssertUnwindSafe(tick_forever).catch_unwind().await.is_ok(); - } - }; - get_results.or(execute_forever).await - } - - #[inline] - async fn execute_global_scope<'scope, 'ticker, T>( - executor: &'scope crate::executor::Executor<'scope>, - scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, - get_results: impl Future>, - ) -> Vec { - let execute_forever = async { - loop { - let tick_forever = async { - loop { - scope_ticker.tick().await; - } - }; - let _result = AssertUnwindSafe(executor.run(tick_forever)) - .catch_unwind() - .await - .is_ok(); - } - }; - get_results.or(execute_forever).await - } - - #[inline] - async fn execute_scope<'scope, 'ticker, T>( - scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, - get_results: impl Future>, - ) -> Vec { - let execute_forever = async { - loop { - let tick_forever = async { - loop { - scope_ticker.tick().await; - } - }; - let _result = AssertUnwindSafe(tick_forever).catch_unwind().await.is_ok(); - } - }; - get_results.or(execute_forever).await - } - /// Spawns a static future onto the thread pool. The returned [`Task`] is a /// future that can be polled for the result. It can also be canceled and /// "detached", allowing the task to continue running even if dropped. In @@ -578,25 +416,7 @@ impl TaskPool { where T: 'static, { - Task::new(TaskPool::LOCAL_EXECUTOR.with(|executor| executor.spawn(future))) - } - - /// Runs a function with the local executor. Typically used to tick - /// the local executor on the main thread as it needs to share time with - /// other things. - /// - /// ``` - /// use bevy_tasks::TaskPool; - /// - /// TaskPool::new().with_local_executor(|local_executor| { - /// local_executor.try_tick(); - /// }); - /// ``` - pub fn with_local_executor(&self, f: F) -> R - where - F: FnOnce(&crate::executor::LocalExecutor) -> R, - { - Self::LOCAL_EXECUTOR.with(f) + Task::new(self.executor.spawn_local(future)) } } @@ -625,9 +445,9 @@ impl Drop for TaskPool { /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { - executor: &'scope crate::executor::Executor<'scope>, - external_executor: &'scope ThreadExecutor<'scope>, - scope_executor: &'scope ThreadExecutor<'scope>, + executor: &'scope Executor<'scope>, + external_spawner: LocalTaskSpawner<'scope>, + scope_spawner: LocalTaskSpawner<'scope>, spawned: &'scope ConcurrentQueue>>>, // make `Scope` invariant over 'scope and 'env scope: PhantomData<&'scope mut &'scope ()>, @@ -648,11 +468,14 @@ impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { .executor .spawn(AssertUnwindSafe(f).catch_unwind()) .fallible(); - // ConcurrentQueue only errors when closed or full, but we never - // close and use an unbounded queue, so it is safe to unwrap - self.spawned.push(task).unwrap(); + let result = self.spawned.push(task); + debug_assert!(result.is_ok()); } + #[expect( + unsafe_code, + reason = "LocalTaskSpawner::spawn otherwise requires 'static Futures" + )] /// Spawns a scoped future onto the thread the scope is run on. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of /// [`TaskPool::scope`]'s return value. Users should generally prefer to use @@ -660,15 +483,21 @@ impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { /// /// For more information, see [`TaskPool::scope`]. pub fn spawn_on_scope + 'scope + Send>(&self, f: Fut) { - let task = self - .scope_executor - .spawn(AssertUnwindSafe(f).catch_unwind()) - .fallible(); - // ConcurrentQueue only errors when closed or full, but we never - // close and use an unbounded queue, so it is safe to unwrap - self.spawned.push(task).unwrap(); + // SAFETY: The scope call that generated this `Scope` ensures that the created + // Task does not outlive 'scope. + let task = unsafe { + self.scope_spawner + .spawn_scoped(AssertUnwindSafe(f).catch_unwind()) + .fallible() + }; + let result = self.spawned.push(task); + debug_assert!(result.is_ok()); } + #[expect( + unsafe_code, + reason = "LocalTaskSpawner::spawn otherwise requires 'static Futures" + )] /// Spawns a scoped future onto the thread of the external thread executor. /// This is typically the main thread. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of @@ -677,13 +506,17 @@ impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { /// /// For more information, see [`TaskPool::scope`]. pub fn spawn_on_external + 'scope + Send>(&self, f: Fut) { - let task = self - .external_executor - .spawn(AssertUnwindSafe(f).catch_unwind()) - .fallible(); + // SAFETY: The scope call that generated this `Scope` ensures that the created + // Task does not outlive 'scope. + let task = unsafe { + self.external_spawner + .spawn_scoped(AssertUnwindSafe(f).catch_unwind()) + .fallible() + }; // ConcurrentQueue only errors when closed or full, but we never - // close and use an unbounded queue, so it is safe to unwrap - self.spawned.push(task).unwrap(); + // close and use an unbounded queue, so pushing should always succeed. + let result = self.spawned.push(task); + debug_assert!(result.is_ok()); } } diff --git a/crates/bevy_tasks/src/thread_executor.rs b/crates/bevy_tasks/src/thread_executor.rs deleted file mode 100644 index 86d2ab280d87c..0000000000000 --- a/crates/bevy_tasks/src/thread_executor.rs +++ /dev/null @@ -1,133 +0,0 @@ -use core::marker::PhantomData; -use std::thread::{self, ThreadId}; - -use crate::executor::Executor; -use async_task::Task; -use futures_lite::Future; - -/// An executor that can only be ticked on the thread it was instantiated on. But -/// can spawn `Send` tasks from other threads. -/// -/// # Example -/// ``` -/// # use std::sync::{Arc, atomic::{AtomicI32, Ordering}}; -/// use bevy_tasks::ThreadExecutor; -/// -/// let thread_executor = ThreadExecutor::new(); -/// let count = Arc::new(AtomicI32::new(0)); -/// -/// // create some owned values that can be moved into another thread -/// let count_clone = count.clone(); -/// -/// std::thread::scope(|scope| { -/// scope.spawn(|| { -/// // we cannot get the ticker from another thread -/// let not_thread_ticker = thread_executor.ticker(); -/// assert!(not_thread_ticker.is_none()); -/// -/// // but we can spawn tasks from another thread -/// thread_executor.spawn(async move { -/// count_clone.fetch_add(1, Ordering::Relaxed); -/// }).detach(); -/// }); -/// }); -/// -/// // the tasks do not make progress unless the executor is manually ticked -/// assert_eq!(count.load(Ordering::Relaxed), 0); -/// -/// // tick the ticker until task finishes -/// let thread_ticker = thread_executor.ticker().unwrap(); -/// thread_ticker.try_tick(); -/// assert_eq!(count.load(Ordering::Relaxed), 1); -/// ``` -#[derive(Debug)] -pub struct ThreadExecutor<'task> { - executor: Executor<'task>, - thread_id: ThreadId, -} - -impl<'task> Default for ThreadExecutor<'task> { - fn default() -> Self { - Self { - executor: Executor::new(), - thread_id: thread::current().id(), - } - } -} - -impl<'task> ThreadExecutor<'task> { - /// create a new [`ThreadExecutor`] - pub fn new() -> Self { - Self::default() - } - - /// Spawn a task on the thread executor - pub fn spawn( - &self, - future: impl Future + Send + 'task, - ) -> Task { - self.executor.spawn(future) - } - - /// Gets the [`ThreadExecutorTicker`] for this executor. - /// Use this to tick the executor. - /// It only returns the ticker if it's on the thread the executor was created on - /// and returns `None` otherwise. - pub fn ticker<'ticker>(&'ticker self) -> Option> { - if thread::current().id() == self.thread_id { - return Some(ThreadExecutorTicker { - executor: self, - _marker: PhantomData, - }); - } - None - } - - /// Returns true if `self` and `other`'s executor is same - pub fn is_same(&self, other: &Self) -> bool { - core::ptr::eq(self, other) - } -} - -/// Used to tick the [`ThreadExecutor`]. The executor does not -/// make progress unless it is manually ticked on the thread it was -/// created on. -#[derive(Debug)] -pub struct ThreadExecutorTicker<'task, 'ticker> { - executor: &'ticker ThreadExecutor<'task>, - // make type not send or sync - _marker: PhantomData<*const ()>, -} - -impl<'task, 'ticker> ThreadExecutorTicker<'task, 'ticker> { - /// Tick the thread executor. - pub async fn tick(&self) { - self.executor.executor.tick().await; - } - - /// Synchronously try to tick a task on the executor. - /// Returns false if does not find a task to tick. - pub fn try_tick(&self) -> bool { - self.executor.executor.try_tick() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use alloc::sync::Arc; - - #[test] - fn test_ticker() { - let executor = Arc::new(ThreadExecutor::new()); - let ticker = executor.ticker(); - assert!(ticker.is_some()); - - thread::scope(|s| { - s.spawn(|| { - let ticker = executor.ticker(); - assert!(ticker.is_none()); - }); - }); - } -} diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs index 40cabd76ac5e6..b6da4ac7b860e 100644 --- a/crates/bevy_tasks/src/usages.rs +++ b/crates/bevy_tasks/src/usages.rs @@ -74,36 +74,3 @@ taskpool! { /// See [`TaskPool`] documentation for details on Bevy tasks. (IO_TASK_POOL, IoTaskPool) } - -crate::cfg::web! { - if {} else { - /// A function used by `bevy_app` to tick the global tasks pools on the main thread. - /// This will run a maximum of 100 local tasks per executor per call to this function. - /// - /// # Warning - /// - /// This function *must* be called on the main thread, or the task pools will not be updated appropriately. - pub fn tick_global_task_pools_on_main_thread() { - COMPUTE_TASK_POOL - .get() - .unwrap() - .with_local_executor(|compute_local_executor| { - ASYNC_COMPUTE_TASK_POOL - .get() - .unwrap() - .with_local_executor(|async_local_executor| { - IO_TASK_POOL - .get() - .unwrap() - .with_local_executor(|io_local_executor| { - for _ in 0..100 { - compute_local_executor.try_tick(); - async_local_executor.try_tick(); - io_local_executor.try_tick(); - } - }); - }); - }); - } - } -} diff --git a/crates/bevy_transform/Cargo.toml b/crates/bevy_transform/Cargo.toml index 42173c0f1776b..d593532e1963d 100644 --- a/crates/bevy_transform/Cargo.toml +++ b/crates/bevy_transform/Cargo.toml @@ -33,7 +33,7 @@ approx = "0.5.1" [features] # Turning off default features leaves you with a barebones # definition of transform. -default = ["std", "bevy-support", "bevy_reflect", "async_executor"] +default = ["std", "bevy-support", "bevy_reflect"] # Functionality @@ -55,12 +55,6 @@ bevy_reflect = [ "bevy_app/bevy_reflect", ] -# Executor Backend - -## Uses `async-executor` as a task execution backend. -## This backend is incompatible with `no_std` targets. -async_executor = ["std", "bevy_tasks/async_executor"] - # Platform Compatibility ## Allows access to the `std` crate. Enabling this feature will prevent compilation diff --git a/crates/bevy_winit/src/state.rs b/crates/bevy_winit/src/state.rs index 0eef31fb96640..94d0d8889064b 100644 --- a/crates/bevy_winit/src/state.rs +++ b/crates/bevy_winit/src/state.rs @@ -15,8 +15,6 @@ use bevy_input::{ use bevy_log::{trace, warn}; use bevy_math::{ivec2, DVec2, Vec2}; use bevy_platform::time::Instant; -#[cfg(not(target_arch = "wasm32"))] -use bevy_tasks::tick_global_task_pools_on_main_thread; use core::marker::PhantomData; #[cfg(target_arch = "wasm32")] use winit::platform::web::EventLoopExtWebSys; @@ -153,10 +151,7 @@ impl ApplicationHandler for WinitAppRunnerState { let _span = tracing::info_span!("winit event_handler").entered(); if self.app.plugins_state() != PluginsState::Cleaned { - if self.app.plugins_state() != PluginsState::Ready { - #[cfg(not(target_arch = "wasm32"))] - tick_global_task_pools_on_main_thread(); - } else { + if self.app.plugins_state() == PluginsState::Ready { self.app.finish(); self.app.cleanup(); } diff --git a/docs/cargo_features.md b/docs/cargo_features.md index 2f4bbd95e5c0b..8cd55bb514a39 100644 --- a/docs/cargo_features.md +++ b/docs/cargo_features.md @@ -14,7 +14,6 @@ The default feature set enables most of the expected features of a game engine, |android-game-activity|Android GameActivity support. Default, choose between this and `android-native-activity`.| |android_shared_stdcxx|Enable using a shared stdlib for cxx on Android| |animation|Enable animation support, and glTF animation loading| -|async_executor|Uses `async-executor` as a task execution backend.| |bevy_animation|Provides animation functionality| |bevy_anti_alias|Provides various anti aliasing solutions| |bevy_asset|Provides asset functionality|