diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d0d0a346b5984..ddf62562dca19 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -91,6 +91,13 @@ jobs: miri: runs-on: macos-latest timeout-minutes: 60 + env: + # -Zrandomize-layout makes sure we dont rely on the layout of anything that might change + RUSTFLAGS: -Zrandomize-layout + # https://github.com/rust-lang/miri#miri--z-flags-and-environment-variables + # -Zmiri-disable-isolation is needed because our executor uses `fastrand` which accesses system time. + # -Zmiri-ignore-leaks is necessary because a bunch of tests don't join all threads before finishing. + MIRIFLAGS: -Zmiri-ignore-leaks -Zmiri-disable-isolation steps: - uses: actions/checkout@v5 - uses: actions/cache/restore@v4 @@ -111,17 +118,14 @@ jobs: with: toolchain: ${{ env.NIGHTLY_TOOLCHAIN }} components: miri - - name: CI job + - name: CI job (Tasks) + # To run the tests one item at a time for troubleshooting, use + # cargo --quiet test --lib -- --list | sed 's/: test$//' | MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-disable-weak-memory-emulation" xargs -n1 cargo miri test -p bevy_tasks --lib -- --exact + run: cargo miri test -p bevy_tasks --features bevy_executor --features multi_threaded + - name: CI job (ECS) # To run the tests one item at a time for troubleshooting, use # cargo --quiet test --lib -- --list | sed 's/: test$//' | MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-disable-weak-memory-emulation" xargs -n1 cargo miri test -p bevy_ecs --lib -- --exact run: cargo miri test -p bevy_ecs --features bevy_utils/debug - env: - # -Zrandomize-layout makes sure we dont rely on the layout of anything that might change - RUSTFLAGS: -Zrandomize-layout - # https://github.com/rust-lang/miri#miri--z-flags-and-environment-variables - # -Zmiri-disable-isolation is needed because our executor uses `fastrand` which accesses system time. - # -Zmiri-ignore-leaks is necessary because a bunch of tests don't join all threads before finishing. - MIRIFLAGS: -Zmiri-ignore-leaks -Zmiri-disable-isolation check-compiles: runs-on: ubuntu-latest diff --git a/Cargo.toml b/Cargo.toml index dbf0401117928..97ded15ff9209 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -127,7 +127,6 @@ unused_qualifications = "warn" [features] default = [ "std", - "async_executor", "android-game-activity", "android_shared_stdcxx", "animation", @@ -566,9 +565,6 @@ custom_cursor = ["bevy_internal/custom_cursor"] # Experimental support for nodes that are ignored for UI layouting ghost_nodes = ["bevy_internal/ghost_nodes"] -# Uses `async-executor` as a task execution backend. -async_executor = ["std", "bevy_internal/async_executor"] - # Allows access to the `std` crate. std = ["bevy_internal/std"] diff --git a/benches/benches/bevy_ecs/iteration/heavy_compute.rs b/benches/benches/bevy_ecs/iteration/heavy_compute.rs index e057b20a431be..21483463cdd96 100644 --- a/benches/benches/bevy_ecs/iteration/heavy_compute.rs +++ b/benches/benches/bevy_ecs/iteration/heavy_compute.rs @@ -1,5 +1,5 @@ use bevy_ecs::prelude::*; -use bevy_tasks::{ComputeTaskPool, TaskPool}; +use bevy_tasks::{ComputeTaskPool, TaskPoolBuilder}; use criterion::Criterion; use glam::*; @@ -20,7 +20,7 @@ pub fn heavy_compute(c: &mut Criterion) { group.warm_up_time(core::time::Duration::from_millis(500)); group.measurement_time(core::time::Duration::from_secs(4)); group.bench_function("base", |b| { - ComputeTaskPool::get_or_init(TaskPool::default); + ComputeTaskPool::get_or_init(TaskPoolBuilder::default); let mut world = World::default(); diff --git a/benches/benches/bevy_ecs/iteration/par_iter_simple.rs b/benches/benches/bevy_ecs/iteration/par_iter_simple.rs index 92259cb98fecf..089a9854e76d4 100644 --- a/benches/benches/bevy_ecs/iteration/par_iter_simple.rs +++ b/benches/benches/bevy_ecs/iteration/par_iter_simple.rs @@ -1,5 +1,5 @@ use bevy_ecs::prelude::*; -use bevy_tasks::{ComputeTaskPool, TaskPool}; +use bevy_tasks::{ComputeTaskPool, TaskPoolBuilder}; use glam::*; #[derive(Component, Copy, Clone)] @@ -26,7 +26,7 @@ fn insert_if_bit_enabled(entity: &mut EntityWorldMut, i: u16) { impl<'w> Benchmark<'w> { pub fn new(fragment: u16) -> Self { - ComputeTaskPool::get_or_init(TaskPool::default); + ComputeTaskPool::get_or_init(TaskPoolBuilder::default); let mut world = World::new(); diff --git a/benches/benches/bevy_ecs/iteration/par_iter_simple_foreach_hybrid.rs b/benches/benches/bevy_ecs/iteration/par_iter_simple_foreach_hybrid.rs index 9dbcba87852f7..8b90783dc5c51 100644 --- a/benches/benches/bevy_ecs/iteration/par_iter_simple_foreach_hybrid.rs +++ b/benches/benches/bevy_ecs/iteration/par_iter_simple_foreach_hybrid.rs @@ -1,5 +1,5 @@ use bevy_ecs::prelude::*; -use bevy_tasks::{ComputeTaskPool, TaskPool}; +use bevy_tasks::{ComputeTaskPool, TaskPoolBuilder}; use rand::{prelude::SliceRandom, SeedableRng}; use rand_chacha::ChaCha8Rng; @@ -18,7 +18,7 @@ pub struct Benchmark<'w>(World, QueryState<(&'w mut TableData, &'w SparseData)>) impl<'w> Benchmark<'w> { pub fn new() -> Self { let mut world = World::new(); - ComputeTaskPool::get_or_init(TaskPool::default); + ComputeTaskPool::get_or_init(TaskPoolBuilder::default); let mut v = vec![]; for _ in 0..100000 { diff --git a/crates/bevy_a11y/Cargo.toml b/crates/bevy_a11y/Cargo.toml index 4d36eeecc1011..1a2d255cd879a 100644 --- a/crates/bevy_a11y/Cargo.toml +++ b/crates/bevy_a11y/Cargo.toml @@ -9,7 +9,7 @@ license = "MIT OR Apache-2.0" keywords = ["bevy", "accessibility", "a11y"] [features] -default = ["std", "bevy_reflect", "bevy_ecs/async_executor"] +default = ["std", "bevy_reflect"] # Functionality diff --git a/crates/bevy_app/src/app.rs b/crates/bevy_app/src/app.rs index 789debfac3578..92c703a0f1125 100644 --- a/crates/bevy_app/src/app.rs +++ b/crates/bevy_app/src/app.rs @@ -1476,8 +1476,7 @@ type RunnerFn = Box AppExit>; fn run_once(mut app: App) -> AppExit { while app.plugins_state() == PluginsState::Adding { - #[cfg(not(all(target_arch = "wasm32", feature = "web")))] - bevy_tasks::tick_global_task_pools_on_main_thread(); + core::hint::spin_loop(); } app.finish(); app.cleanup(); diff --git a/crates/bevy_app/src/schedule_runner.rs b/crates/bevy_app/src/schedule_runner.rs index 594f849b2f905..7e987cfe58be1 100644 --- a/crates/bevy_app/src/schedule_runner.rs +++ b/crates/bevy_app/src/schedule_runner.rs @@ -77,8 +77,7 @@ impl Plugin for ScheduleRunnerPlugin { let plugins_state = app.plugins_state(); if plugins_state != PluginsState::Cleaned { while app.plugins_state() == PluginsState::Adding { - #[cfg(not(all(target_arch = "wasm32", feature = "web")))] - bevy_tasks::tick_global_task_pools_on_main_thread(); + core::hint::spin_loop(); } app.finish(); app.cleanup(); diff --git a/crates/bevy_app/src/task_pool_plugin.rs b/crates/bevy_app/src/task_pool_plugin.rs index 8014790f07772..c2e2db7d21c05 100644 --- a/crates/bevy_app/src/task_pool_plugin.rs +++ b/crates/bevy_app/src/task_pool_plugin.rs @@ -6,21 +6,6 @@ use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuil use core::fmt::Debug; use log::trace; -cfg_if::cfg_if! { - if #[cfg(not(all(target_arch = "wasm32", feature = "web")))] { - use {crate::Last, bevy_tasks::tick_global_task_pools_on_main_thread}; - use bevy_ecs::system::NonSendMarker; - - /// A system used to check and advanced our task pools. - /// - /// Calls [`tick_global_task_pools_on_main_thread`], - /// and uses [`NonSendMarker`] to ensure that this system runs on the main thread - fn tick_global_task_pools(_main_thread_marker: NonSendMarker) { - tick_global_task_pools_on_main_thread(); - } - } -} - /// Setup of default task pools: [`AsyncComputeTaskPool`], [`ComputeTaskPool`], [`IoTaskPool`]. #[derive(Default)] pub struct TaskPoolPlugin { @@ -32,9 +17,6 @@ impl Plugin for TaskPoolPlugin { fn build(&self, _app: &mut App) { // Setup the default bevy task pools self.task_pool_options.create_default_pools(); - - #[cfg(not(all(target_arch = "wasm32", feature = "web")))] - _app.add_systems(Last, tick_global_task_pools); } } @@ -190,7 +172,7 @@ impl TaskPoolOptions { builder }; - builder.build() + builder }); } @@ -220,7 +202,7 @@ impl TaskPoolOptions { builder }; - builder.build() + builder }); } @@ -250,7 +232,7 @@ impl TaskPoolOptions { builder }; - builder.build() + builder }); } } diff --git a/crates/bevy_asset/Cargo.toml b/crates/bevy_asset/Cargo.toml index bde2ad5e033c3..c1c1801b885b3 100644 --- a/crates/bevy_asset/Cargo.toml +++ b/crates/bevy_asset/Cargo.toml @@ -30,9 +30,7 @@ bevy_ecs = { path = "../bevy_ecs", version = "0.18.0-dev", default-features = fa bevy_reflect = { path = "../bevy_reflect", version = "0.18.0-dev", default-features = false, features = [ "uuid", ] } -bevy_tasks = { path = "../bevy_tasks", version = "0.18.0-dev", default-features = false, features = [ - "async_executor", -] } +bevy_tasks = { path = "../bevy_tasks", version = "0.18.0-dev", default-features = false } bevy_utils = { path = "../bevy_utils", version = "0.18.0-dev", default-features = false } bevy_platform = { path = "../bevy_platform", version = "0.18.0-dev", default-features = false, features = [ "std", diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index eb39ed859e33a..394ee0c56f172 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -11,7 +11,7 @@ categories = ["game-engines", "data-structures"] rust-version = "1.86.0" [features] -default = ["std", "bevy_reflect", "async_executor", "backtrace"] +default = ["std", "bevy_reflect", "backtrace"] # Functionality @@ -49,12 +49,6 @@ bevy_debug_stepping = [] ## This will often provide more detailed error messages. track_location = [] -# Executor Backend - -## Uses `async-executor` as a task execution backend. -## This backend is incompatible with `no_std` targets. -async_executor = ["std", "bevy_tasks/async_executor"] - # Platform Compatibility ## Allows access to the `std` crate. Enabling this feature will prevent compilation diff --git a/crates/bevy_ecs/src/lib.rs b/crates/bevy_ecs/src/lib.rs index d34932cb06751..c40b7304386f6 100644 --- a/crates/bevy_ecs/src/lib.rs +++ b/crates/bevy_ecs/src/lib.rs @@ -170,7 +170,7 @@ mod tests { }; use alloc::{string::String, sync::Arc, vec, vec::Vec}; use bevy_platform::collections::HashSet; - use bevy_tasks::{ComputeTaskPool, TaskPool}; + use bevy_tasks::{ComputeTaskPool, TaskPoolBuilder}; use core::{ any::TypeId, marker::PhantomData, @@ -495,7 +495,7 @@ mod tests { #[test] fn par_for_each_dense() { - ComputeTaskPool::get_or_init(TaskPool::default); + ComputeTaskPool::get_or_init(TaskPoolBuilder::default); let mut world = World::new(); let e1 = world.spawn(A(1)).id(); let e2 = world.spawn(A(2)).id(); @@ -517,7 +517,7 @@ mod tests { #[test] fn par_for_each_sparse() { - ComputeTaskPool::get_or_init(TaskPool::default); + ComputeTaskPool::get_or_init(TaskPoolBuilder::default); let mut world = World::new(); let e1 = world.spawn(SparseStored(1)).id(); let e2 = world.spawn(SparseStored(2)).id(); diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index 09821a718c668..9e19417d6723d 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -1345,7 +1345,7 @@ impl QueryState { /// #[derive(Component, PartialEq, Debug)] /// struct A(usize); /// - /// # bevy_tasks::ComputeTaskPool::get_or_init(|| bevy_tasks::TaskPool::new()); + /// # bevy_tasks::ComputeTaskPool::get_or_init(|| bevy_tasks::TaskPoolBuilder::default()); /// /// let mut world = World::new(); /// diff --git a/crates/bevy_ecs/src/schedule/executor/mod.rs b/crates/bevy_ecs/src/schedule/executor/mod.rs index 7d595aeec1a8d..4c23522414601 100644 --- a/crates/bevy_ecs/src/schedule/executor/mod.rs +++ b/crates/bevy_ecs/src/schedule/executor/mod.rs @@ -9,7 +9,7 @@ use core::any::TypeId; pub use self::single_threaded::SingleThreadedExecutor; #[cfg(feature = "std")] -pub use self::multi_threaded::{MainThreadExecutor, MultiThreadedExecutor}; +pub use self::multi_threaded::{MainThreadTaskSpawner, MultiThreadedExecutor}; use fixedbitset::FixedBitSet; diff --git a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs index 497b937c31f51..8e3b043d7182f 100644 --- a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs @@ -1,7 +1,6 @@ use alloc::{boxed::Box, vec::Vec}; use bevy_platform::cell::SyncUnsafeCell; -use bevy_platform::sync::Arc; -use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor}; +use bevy_tasks::{ComputeTaskPool, LocalTaskSpawner, Scope, TaskPoolBuilder}; use concurrent_queue::ConcurrentQueue; use core::{any::Any, panic::AssertUnwindSafe}; use fixedbitset::FixedBitSet; @@ -270,14 +269,12 @@ impl SystemExecutor for MultiThreadedExecutor { } let thread_executor = world - .get_resource::() + .get_resource::() .map(|e| e.0.clone()); - let thread_executor = thread_executor.as_deref(); let environment = &Environment::new(self, schedule, world); - ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor( - false, + ComputeTaskPool::get_or_init(TaskPoolBuilder::default).scope_with_executor( thread_executor, |scope| { let context = Context { @@ -871,20 +868,20 @@ unsafe fn evaluate_and_fold_conditions( .fold(true, |acc, res| acc && res) } -/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread +/// New-typed [`LocalTaskSpawner`] [`Resource`] that is used to run systems on the main thread #[derive(Resource, Clone)] -pub struct MainThreadExecutor(pub Arc>); +pub struct MainThreadTaskSpawner(pub LocalTaskSpawner); -impl Default for MainThreadExecutor { +impl Default for MainThreadTaskSpawner { fn default() -> Self { Self::new() } } -impl MainThreadExecutor { +impl MainThreadTaskSpawner { /// Creates a new executor that can be used to run systems on the main thread. pub fn new() -> Self { - MainThreadExecutor(TaskPool::get_thread_executor()) + MainThreadTaskSpawner(ComputeTaskPool::get().current_thread_spawner()) } } diff --git a/crates/bevy_ecs/src/schedule/mod.rs b/crates/bevy_ecs/src/schedule/mod.rs index a03825f9eda56..2015582659473 100644 --- a/crates/bevy_ecs/src/schedule/mod.rs +++ b/crates/bevy_ecs/src/schedule/mod.rs @@ -111,12 +111,12 @@ mod tests { #[cfg(not(miri))] fn parallel_execution() { use alloc::sync::Arc; - use bevy_tasks::{ComputeTaskPool, TaskPool}; + use bevy_tasks::{ComputeTaskPool, TaskPoolBuilder}; use std::sync::Barrier; let mut world = World::default(); let mut schedule = Schedule::default(); - let thread_count = ComputeTaskPool::get_or_init(TaskPool::default).thread_num(); + let thread_count = ComputeTaskPool::get_or_init(TaskPoolBuilder::default).thread_num(); let barrier = Arc::new(Barrier::new(thread_count)); diff --git a/crates/bevy_input/Cargo.toml b/crates/bevy_input/Cargo.toml index cf6d1a3dd9add..b0e1bcb2f8585 100644 --- a/crates/bevy_input/Cargo.toml +++ b/crates/bevy_input/Cargo.toml @@ -9,7 +9,7 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [features] -default = ["std", "bevy_reflect", "bevy_ecs/async_executor", "smol_str"] +default = ["std", "bevy_reflect", "smol_str"] # Functionality diff --git a/crates/bevy_input_focus/Cargo.toml b/crates/bevy_input_focus/Cargo.toml index 1766ee2243b31..e03a2e0d21fe1 100644 --- a/crates/bevy_input_focus/Cargo.toml +++ b/crates/bevy_input_focus/Cargo.toml @@ -10,7 +10,7 @@ keywords = ["bevy"] rust-version = "1.85.0" [features] -default = ["std", "bevy_reflect", "bevy_ecs/async_executor"] +default = ["std", "bevy_reflect"] # Functionality diff --git a/crates/bevy_internal/Cargo.toml b/crates/bevy_internal/Cargo.toml index 3a43a38147c50..0ed62327171db 100644 --- a/crates/bevy_internal/Cargo.toml +++ b/crates/bevy_internal/Cargo.toml @@ -409,15 +409,6 @@ libm = [ "bevy_window?/libm", ] -# Uses `async-executor` as a task execution backend. -# This backend is incompatible with `no_std` targets. -async_executor = [ - "std", - "bevy_tasks/async_executor", - "bevy_ecs/async_executor", - "bevy_transform/async_executor", -] - # Enables use of browser APIs. # Note this is currently only applicable on `wasm32` architectures. web = ["bevy_app/web", "bevy_platform/web", "bevy_reflect/web"] diff --git a/crates/bevy_render/src/pipelined_rendering.rs b/crates/bevy_render/src/pipelined_rendering.rs index 3aaa2ac8cc203..455fc65ccffa9 100644 --- a/crates/bevy_render/src/pipelined_rendering.rs +++ b/crates/bevy_render/src/pipelined_rendering.rs @@ -3,7 +3,7 @@ use async_channel::{Receiver, Sender}; use bevy_app::{App, AppExit, AppLabel, Plugin, SubApp}; use bevy_ecs::{ resource::Resource, - schedule::MainThreadExecutor, + schedule::MainThreadTaskSpawner, world::{Mut, World}, }; use bevy_tasks::ComputeTaskPool; @@ -114,7 +114,7 @@ impl Plugin for PipelinedRenderingPlugin { if app.get_sub_app(RenderApp).is_none() { return; } - app.insert_resource(MainThreadExecutor::new()); + app.insert_resource(MainThreadTaskSpawner::new()); let mut sub_app = SubApp::new(); sub_app.set_extract(renderer_extract); @@ -136,7 +136,7 @@ impl Plugin for PipelinedRenderingPlugin { .expect("Unable to get RenderApp. Another plugin may have removed the RenderApp before PipelinedRenderingPlugin"); // clone main thread executor to render world - let executor = app.world().get_resource::().unwrap(); + let executor = app.world().get_resource::().unwrap(); render_app.world_mut().insert_resource(executor.clone()); render_to_app_sender.send_blocking(render_app).unwrap(); @@ -181,12 +181,12 @@ impl Plugin for PipelinedRenderingPlugin { // This function waits for the rendering world to be received, // runs extract, and then sends the rendering world back to the render thread. fn renderer_extract(app_world: &mut World, _world: &mut World) { - app_world.resource_scope(|world, main_thread_executor: Mut| { + app_world.resource_scope(|world, main_thread_executor: Mut| { world.resource_scope(|world, mut render_channels: Mut| { // we use a scope here to run any main thread tasks that the render world still needs to run // while we wait for the render world to be received. if let Some(mut render_app) = ComputeTaskPool::get() - .scope_with_executor(true, Some(&*main_thread_executor.0), |s| { + .scope_with_executor(Some(main_thread_executor.0.clone()), |s| { s.spawn(async { render_channels.recv().await }); }) .pop() diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index abe5a1485a6ad..f9cda7aa9d2cb 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -9,20 +9,24 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [features] -default = ["async_executor", "futures-lite"] +default = ["futures-lite"] # Enables multi-threading support. # Without this feature, all tasks will be run on a single thread. -multi_threaded = [ - "bevy_platform/std", - "dep:async-channel", - "dep:concurrent-queue", - "async_executor", -] +multi_threaded = ["bevy_platform/std", "dep:async-channel", "bevy_executor"] -# Uses `async-executor` as a task execution backend. +# Uses a Bevy-specific fork of `async-executor` as a task execution backend. # This backend is incompatible with `no_std` targets. -async_executor = ["bevy_platform/std", "dep:async-executor", "futures-lite"] +bevy_executor = [ + "dep:fastrand", + "dep:slab", + "dep:thread_local", + "dep:crossbeam-utils", + "dep:pin-project-lite", + "futures-lite", + "async-task/std", + "concurrent-queue/std", +] # Provide an implementation of `block_on` from `futures-lite`. futures-lite = ["bevy_platform/std", "futures-lite/std"] @@ -44,17 +48,18 @@ derive_more = { version = "2", default-features = false, features = [ "deref", "deref_mut", ] } -async-executor = { version = "1.11", optional = true } +slab = { version = "0.4", optional = true } +pin-project-lite = { version = "0.2", optional = true } +thread_local = { version = "1.1", optional = true } +fastrand = { version = "2.3", optional = true, default-features = false } async-channel = { version = "2.3.0", optional = true } async-io = { version = "2.0.0", optional = true } -concurrent-queue = { version = "2.0.0", optional = true } atomic-waker = { version = "1", default-features = false } -crossbeam-queue = { version = "0.3", default-features = false, features = [ - "alloc", -] } +concurrent-queue = { version = "2.5", default-features = false } +crossbeam-utils = { version = "0.8", default-features = false, optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] -pin-project = "1" +pin-project-lite = "0.2" async-channel = { version = "2.3.0", default-features = false } [target.'cfg(not(all(target_has_atomic = "8", target_has_atomic = "16", target_has_atomic = "32", target_has_atomic = "64", target_has_atomic = "ptr")))'.dependencies] @@ -73,6 +78,7 @@ futures-lite = { version = "2.0.1", default-features = false, features = [ "std", ] } async-channel = "2.3.0" +async-io = "2.0.0" [lints] workspace = true diff --git a/crates/bevy_tasks/README.md b/crates/bevy_tasks/README.md index 04815df35e6ed..530489bfaa115 100644 --- a/crates/bevy_tasks/README.md +++ b/crates/bevy_tasks/README.md @@ -14,8 +14,8 @@ a single thread and having that thread await the completion of those tasks. This generating the tasks from a slice of data. This library is intended for games and makes no attempt to ensure fairness or ordering of spawned tasks. -It is based on [`async-executor`][async-executor], a lightweight executor that allows the end user to manage their own threads. -`async-executor` is based on async-task, a core piece of async-std. +It is based on a fork of [`async-executor`][async-executor], a lightweight executor that allows the end user to manage their own threads. +`async-executor` is based on [`async-task`][async-task], a core piece of [`smol`][smol]. ## Usage @@ -40,4 +40,6 @@ To enable `no_std` support in this crate, you will need to disable default featu [bevy]: https://bevy.org [rayon]: https://github.com/rayon-rs/rayon -[async-executor]: https://github.com/stjepang/async-executor +[async-executor]: https://github.com/smol-rs/async-executor +[smol]: https://github.com/smol-rs/smol +[async-task]: https://github.com/smol-rs/async-task diff --git a/crates/bevy_tasks/src/bevy_executor.rs b/crates/bevy_tasks/src/bevy_executor.rs new file mode 100644 index 0000000000000..37c95a44a6930 --- /dev/null +++ b/crates/bevy_tasks/src/bevy_executor.rs @@ -0,0 +1,1084 @@ +//! Fork of `async_executor`. +//! +//! It has been vendored along with its tests to update several outdated dependencies. +//! +//! [`async_executor`]: https://github.com/smol-rs/async-executor + +#![expect( + unsafe_code, + reason = "Executor code requires unsafe code for dealing with non-'static lifetimes" +)] +#![allow( + dead_code, + reason = "Not all functions are used with every feature combination" +)] + +use core::panic::{RefUnwindSafe, UnwindSafe}; +use core::pin::Pin; +use core::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; +use core::task::{Context, Poll, Waker}; +use core::cell::UnsafeCell; +use core::mem; +use std::thread::{AccessError, ThreadId}; + +use alloc::collections::VecDeque; +use alloc::fmt; +use async_task::{Builder, Runnable, Task}; +use bevy_platform::prelude::Vec; +use bevy_platform::sync::{Mutex, PoisonError, RwLock, TryLockError}; +use concurrent_queue::ConcurrentQueue; +use futures_lite::{future,FutureExt}; +use slab::Slab; +use thread_local::ThreadLocal; +use crossbeam_utils::CachePadded; + +// ThreadLocalState *must* stay `Sync` due to a currently existing soundness hole. +// See: https://github.com/Amanieu/thread_local-rs/issues/75 +static THREAD_LOCAL_STATE: ThreadLocal = ThreadLocal::new(); + +pub(crate) fn install_runtime_into_current_thread(executor: &'static Executor) { + // Use LOCAL_QUEUE here to set the thread destructor + LOCAL_QUEUE.with(|_| { + let tls = THREAD_LOCAL_STATE.get_or_default(); + let state_ptr: *const State = &executor.state; + tls.executor.swap(state_ptr.cast_mut(), Ordering::Relaxed); + }); +} + +std::thread_local! { + static LOCAL_QUEUE: CachePadded> = const { + CachePadded::new(UnsafeCell::new(LocalQueue { + local_queue: VecDeque::new(), + local_active: Slab::new(), + })) + }; +} + +/// # Safety +/// This must not be accessed at the same time as `LOCAL_QUEUE` in any way. +#[inline(always)] +unsafe fn try_with_local_queue(f: impl FnOnce(&mut LocalQueue) -> T) -> Result { + LOCAL_QUEUE.try_with(|tls| { + // SAFETY: This value is in thread local storage and thus can only be accessed + // from one thread. The caller guarantees that this function is not used with + // LOCAL_QUEUE in any way. + f(unsafe { &mut *tls.get() }) + }) +} + +struct LocalQueue { + local_queue: VecDeque, + local_active: Slab, +} + +impl Drop for LocalQueue { + fn drop(&mut self) { + for waker in self.local_active.drain() { + waker.wake(); + } + + while self.local_queue.pop_front().is_some() {} + } +} + +struct ThreadLocalState { + executor: AtomicPtr, + stealable_queue: ConcurrentQueue, + thread_locked_queue: ConcurrentQueue, +} + +impl Default for ThreadLocalState { + fn default() -> Self { + Self { + executor: AtomicPtr::new(core::ptr::null_mut()), + stealable_queue: ConcurrentQueue::bounded(512), + thread_locked_queue: ConcurrentQueue::unbounded(), + } + } +} + +/// A task spawner for a specific thread. Must be created by calling [`TaskPool::current_thread_spawner`] +/// from the target thread. +/// +/// [`TaskPool::current_thread_spawner`]: crate::TaskPool::current_thread_spawner +#[derive(Clone, Debug)] +pub struct LocalTaskSpawner { + thread_id: ThreadId, + target_queue: &'static ConcurrentQueue, + state: &'static State, +} + +impl LocalTaskSpawner { + /// Spawns a task onto the specific target thread. + pub fn spawn( + &self, + future: impl Future + Send + 'static, + ) -> Task { + // SAFETY: T and `future` are both 'static, so the Task is guaranteed to not outlive it. + unsafe { self.spawn_scoped(future) } + } + + /// Spawns a task onto the executor. + /// + /// # Safety + /// The caller must ensure that the returned Task does not outlive 'a. + pub unsafe fn spawn_scoped<'a, T: Send + 'a>( + &self, + future: impl Future + Send + 'a, + ) -> Task { + // Create the task and register it in the set of active tasks. + // + // SAFETY: + // + // - `future` is `Send`. Therefore we do not need to worry about what thread + // the produced `Runnable` is used and dropped from. + // - `future` is not `'static`, but the caller must make sure that the Task + // and thus the `Runnable` will not outlive `'a`. + // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = unsafe { + Builder::new() + .propagate_panic(true) + .spawn_unchecked(|()| future, self.schedule()) + }; + + // Instead of directly scheduling this task, it's put into the onto the + // thread locked queue to be moved to the target thread, where it will + // either be run immediately or flushed into the thread's local queue. + let result = self.target_queue.push(runnable); + debug_assert!(result.is_ok()); + task + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { + let thread_id = self.thread_id; + let state = self.state; + + move |runnable| { + // SAFETY: This value is in thread local storage and thus can only be accessed + // from one thread. There are no instances where the value is accessed mutably + // from multiple locations simultaneously. + if unsafe { try_with_local_queue(|tls| tls.local_queue.push_back(runnable)) }.is_ok() { + state.notify_specific_thread(thread_id, false); + } + } + } +} + +/// An async executor. +pub struct Executor { + /// The executor state. + state: State, +} + +impl UnwindSafe for Executor {} +impl RefUnwindSafe for Executor {} + +impl fmt::Debug for Executor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_executor(self, "Executor", f) + } +} + +impl Executor { + /// Creates a new executor. + pub const fn new() -> Executor { + Executor { + state: State::new() + } + } + + /// Spawns a 'static and Send task onto the executor. + pub fn spawn(&'static self, future: impl Future + Send + 'static) -> Task { + // SAFETY: Both `T` and `future` are 'static. + unsafe { self.spawn_scoped(future) } + } + + /// Spawns a non-'static Send task onto the executor. + /// + /// # Safety + /// The caller must ensure that the returned Task does not outlive 'a. + pub unsafe fn spawn_scoped<'a, T: Send + 'a>(&'static self, future: impl Future + Send + 'a) -> Task { + // Create the task and register it in the set of active tasks. + // + // SAFETY: + // + // - `future` is `Send`. Therefore we do not need to worry about what thread + // the produced `Runnable` is used and dropped from. + // - `future` is not `'static`, but we make sure that the `Runnable` does + // not outlive `'a`. When the executor is dropped, the `active` field is + // drained and all of the `Waker`s are woken. Then, the queue inside of + // the `Executor` is drained of all of its runnables. This ensures that + // runnables are dropped and this precondition is satisfied. + // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = unsafe { + Builder::new() + .propagate_panic(true) + .spawn_unchecked(|()| future, self.schedule()) + }; + + // `Runnable::schedule ` has a extra extraneous Waker clone/drop if the schedule captures + // variables, so directly schedule here instead. + Self::schedule_runnable(&self.state, runnable); + task + } + + /// Spawns a non-Send task onto the executor. + pub fn spawn_local(&'static self, future: impl Future + 'static) -> Task { + // SAFETY: future is 'static + unsafe { self.spawn_local_scoped(future) } + } + + /// Spawns a non-'static and non-Send task onto the executor. + /// + /// # Panics + /// - This function will panic if any of the internal allocations causes an OOM (out of memory) error. + /// - This function will panic if the current thread's destructor has already been called. + /// + /// # Safety + /// The caller must ensure that the returned Task does not outlive 'a. + pub unsafe fn spawn_local_scoped<'a, T: 'a>( + &'static self, + future: impl Future + 'a, + ) -> Task { + // Remove the task from the set of active tasks when the future finishes. + // + // SAFETY: There are no instances where the value is accessed mutably + // from multiple locations simultaneously. + unsafe { + try_with_local_queue(|tls| { + let entry = tls.local_active.vacant_entry(); + let index = entry.key(); + let builder = Builder::new().propagate_panic(true); + + // SAFETY: There are no instances where the value is accessed mutably + // from multiple locations simultaneously. This AsyncCallOnDrop will be + // invoked after the surrounding scope has exited in either a + // `try_tick_local` or `run` call. + let future = AsyncCallOnDrop::new(future, move || { + try_with_local_queue(|tls| drop(tls.local_active.try_remove(index))).ok(); + }); + + // This is a critical section which will result in UB by aliasing active + // if the AsyncCallOnDrop is called while still in this function. + // + // To avoid this, this guard will abort the process if it does + // panic. Rust's drop order will ensure that this will run before + // executor, and thus before the above AsyncCallOnDrop is dropped. + let _panic_guard = AbortOnPanic; + + // Create the task and register it in the set of active tasks. + // + // SAFETY: + // + // - `future` is not `Send`, but the produced `Runnable` does is bound + // to thread-local storage and thus cannot leave this thread of execution. + // - `future` may not be `'static`, but the caller is required to ensure that + // the future does not outlive the borrowed non-metadata variables of the + // task. + // - `self.schedule_local()` is not `Send` or `Sync` so all instances + // must not leave the current thread of execution, and it does not + // all of them are bound vy use of thread-local storage. + // - `self.schedule_local()` is `'static`, as checked below. + let (runnable, task) = builder + .spawn_unchecked(|()| future, self.schedule_local()); + entry.insert(runnable.waker()); + + mem::forget(_panic_guard); + + // Runnable::schedule has a extra extraneous Waker clone/drop if the schedule captures + // variables, so directly schedule here instead. + Self::schedule_runnable_local(&self.state, tls, runnable); + + task + }).unwrap() + } + } + + pub fn current_thread_spawner(&'static self) -> LocalTaskSpawner { + LocalTaskSpawner { + thread_id: std::thread::current().id(), + target_queue: &THREAD_LOCAL_STATE.get_or_default().thread_locked_queue, + state: &self.state, + } + } + + pub fn try_tick_local() -> bool { + // SAFETY: There are no instances where the value is accessed mutably + // from multiple locations simultaneously. As the Runnable is run after + // this scope closes, the AsyncCallOnDrop around the future will be invoked + // without overlapping mutable accssses. + unsafe { try_with_local_queue(|tls| tls.local_queue.pop_front()) } + .ok() + .flatten() + .map(Runnable::run) + .is_some() + } + + /// Runs the executor until the given future completes. + pub fn run<'b, T>(&'static self, future: impl Future + 'b) -> impl Future + 'b { + let mut runner = Runner::new(&self.state); + + // A future that runs tasks forever. + let run_forever = async move { + let mut rng = fastrand::Rng::new(); + loop { + for _ in 0..200 { + let runnable = runner.runnable(&mut rng).await; + runnable.run(); + } + future::yield_now().await; + } + }; + + // Run `future` and `run_forever` concurrently until `future` completes. + future.or(run_forever) + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static { + let state = &self.state; + + move |runnable| { + Self::schedule_runnable(state, runnable); + } + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule_local(&'static self) -> impl Fn(Runnable) + 'static { + let state = &self.state; + move |runnable| { + // SAFETY: This value is in thread local storage and thus can only be accessed + // from one thread. There are no instances where the value is accessed mutably + // from multiple locations simultaneously. + unsafe { + // If this returns Err, the thread's destructor has been called and thus it's meaningless + // to push onto the queue. + let _ = try_with_local_queue(|tls| { + Self::schedule_runnable_local(state, tls, runnable); + }); + } + } + } + + #[inline] + fn schedule_runnable(state: &State, runnable: Runnable) { + // Attempt to push onto the local queue first in dedicated executor threads, + // because we know that this thread is awake and always processing new tasks. + let runnable = if let Some(local_state) = THREAD_LOCAL_STATE.get() { + if core::ptr::eq(local_state.executor.load(Ordering::Relaxed), state) { + match local_state.stealable_queue.push(runnable) { + Ok(()) => { + state.notify_specific_thread(std::thread::current().id(), true); + return; + } + Err(r) => r.into_inner(), + } + } else { + runnable + } + } else { + runnable + }; + // Otherwise push onto the global queue instead. + let result = state.queue.push(runnable); + debug_assert!(result.is_ok()); + state.notify(); + } + + #[inline] + fn schedule_runnable_local(state: &State, tls: &mut LocalQueue, runnable: Runnable) { + tls.local_queue.push_back(runnable); + state.notify_specific_thread(std::thread::current().id(), false); + } +} + +/// The state of a executor. +struct State { + /// The global queue. + queue: ConcurrentQueue, + + /// Local queues created by runners. + stealer_queues: RwLock>>, + + /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. + notified: AtomicBool, + + /// A list of sleeping tickers. + sleepers: Mutex, +} + +impl State { + /// Creates state for a new executor. + const fn new() -> State { + State { + queue: ConcurrentQueue::unbounded(), + stealer_queues: RwLock::new(Vec::new()), + notified: AtomicBool::new(true), + sleepers: Mutex::new(Sleepers { + count: 0, + wakers: Vec::new(), + free_ids: Vec::new(), + }), + } + } + + /// Notifies a sleeping ticker. + #[inline] + fn notify(&self) { + if self + .notified + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + let waker = self.sleepers.lock().unwrap_or_else(PoisonError::into_inner).notify(); + if let Some(w) = waker { + w.wake(); + } + } + } + + /// Notifies a sleeping ticker. + #[inline] + fn notify_specific_thread(&self, thread_id: ThreadId, allow_stealing: bool) { + let mut sleepers = self.sleepers.lock().unwrap_or_else(PoisonError::into_inner); + let mut waker = sleepers.notify_specific_thread(thread_id); + if waker.is_none() + && allow_stealing + && self + .notified + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + waker = sleepers.notify(); + } + if let Some(w) = waker { + w.wake(); + } + } +} + +impl fmt::Debug for State { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_state(self, "State", f) + } +} + +/// A sleeping ticker +struct Sleeper { + id: usize, + thread_id: ThreadId, + waker: Waker, +} + +/// A list of sleeping tickers. +struct Sleepers { + /// Number of sleeping tickers (both notified and unnotified). + count: usize, + + /// IDs and wakers of sleeping unnotified tickers. + /// + /// A sleeping ticker is notified when its waker is missing from this list. + wakers: Vec, + + /// Reclaimed IDs. + free_ids: Vec, +} + +impl Sleepers { + /// Inserts a new sleeping ticker. + fn insert(&mut self, waker: &Waker) -> usize { + let id = self.free_ids.pop().unwrap_or_else(|| self.count + 1); + self.count += 1; + self.wakers.push(Sleeper { + id, + thread_id: std::thread::current().id(), + waker: waker.clone() + }); + id + } + + /// Re-inserts a sleeping ticker's waker if it was notified. + /// + /// Returns `true` if the ticker was notified. + fn update(&mut self, id: usize, waker: &Waker) -> bool { + for item in &mut self.wakers { + if item.id == id { + item.waker.clone_from(waker); + return false; + } + } + + self.wakers.push(Sleeper { + id, + thread_id: std::thread::current().id(), + waker: waker.clone() + }); + true + } + + /// Removes a previously inserted sleeping ticker. + /// + /// Returns `true` if the ticker was notified. + fn remove(&mut self, id: usize) -> bool { + self.count -= 1; + self.free_ids.push(id); + + for i in (0..self.wakers.len()).rev() { + if self.wakers[i].id == id { + self.wakers.remove(i); + return false; + } + } + true + } + + /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping. + fn is_notified(&self) -> bool { + self.count == 0 || self.count > self.wakers.len() + } + + /// Returns notification waker for a sleeping ticker. + /// + /// If a ticker was notified already or there are no tickers, `None` will be returned. + fn notify(&mut self) -> Option { + if self.wakers.len() == self.count { + self.wakers.pop().map(|item| item.waker) + } else { + None + } + } + + /// Returns notification waker for a sleeping ticker. + /// + /// If a ticker was notified already or there are no tickers, `None` will be returned. + fn notify_specific_thread(&mut self, thread_id: ThreadId) -> Option { + for i in 0..self.wakers.len() { + if self.wakers[i].thread_id == thread_id { + let sleeper = self.wakers.remove(i); + return Some(sleeper.waker); + } + } + None + } +} + +/// Runs task one by one. +struct Ticker<'a> { + /// The executor state. + state: &'a State, + + /// Set to a non-zero sleeper ID when in sleeping state. + /// + /// States a ticker can be in: + /// 1) Woken. + /// 2a) Sleeping and unnotified. + /// 2b) Sleeping and notified. + sleeping: usize, +} + +impl Ticker<'_> { + /// Creates a ticker. + fn new(state: &State) -> Ticker<'_> { + Ticker { state, sleeping: 0 } + } + + /// Moves the ticker into sleeping and unnotified state. + /// + /// Returns `false` if the ticker was already sleeping and unnotified. + fn sleep(&mut self, waker: &Waker) -> bool { + let mut sleepers = self.state.sleepers.lock().unwrap_or_else(PoisonError::into_inner); + + match self.sleeping { + // Move to sleeping state. + 0 => { + self.sleeping = sleepers.insert(waker); + } + + // Already sleeping, check if notified. + id => { + if !sleepers.update(id, waker) { + return false; + } + } + } + + self.state + .notified + .store(sleepers.is_notified(), Ordering::Release); + + true + } + + /// Moves the ticker into woken state. + fn wake(&mut self) { + if self.sleeping != 0 { + let mut sleepers = self.state.sleepers.lock().unwrap_or_else(PoisonError::into_inner); + sleepers.remove(self.sleeping); + + self.state + .notified + .store(sleepers.is_notified(), Ordering::Release); + } + self.sleeping = 0; + } + + /// Waits for the next runnable task to run, given a function that searches for a task. + /// + /// # Safety + /// Caller must not access `LOCAL_QUEUE` either directly or with `try_with_local_queue` in any way inside `search`. + unsafe fn runnable_with(&mut self, mut search: impl FnMut(&mut LocalQueue) -> Option) -> impl Future { + future::poll_fn(move |cx| { + // SAFETY: Caller must ensure that there's no instances where LOCAL_QUEUE is accessed mutably + // from multiple locations simultaneously. + unsafe { + try_with_local_queue(|tls| { + loop { + match search(tls) { + None => { + // Move to sleeping and unnotified state. + if !self.sleep(cx.waker()) { + // If already sleeping and unnotified, return. + return Poll::Pending; + } + } + Some(r) => { + // Wake up. + self.wake(); + + // Notify another ticker now to pick up where this ticker left off, just in + // case running the task takes a long time. + self.state.notify(); + + return Poll::Ready(r); + } + } + } + }).unwrap_or(Poll::Pending) + } + }) + } +} + +impl Drop for Ticker<'_> { + fn drop(&mut self) { + // If this ticker is in sleeping state, it must be removed from the sleepers list. + if self.sleeping != 0 { + let mut sleepers = self.state.sleepers.lock().unwrap_or_else(PoisonError::into_inner); + let notified = sleepers.remove(self.sleeping); + + self.state + .notified + .store(sleepers.is_notified(), Ordering::Release); + + // If this ticker was notified, then notify another ticker. + if notified { + drop(sleepers); + self.state.notify(); + } + } + } +} + +/// A worker in a work-stealing executor. +/// +/// This is just a ticker that also has an associated local queue for improved cache locality. +struct Runner<'a> { + /// The executor state. + state: &'a State, + + /// Inner ticker. + ticker: Ticker<'a>, + + /// Bumped every time a runnable task is found. + ticks: usize, + + // The thread local state of the executor for the current thread. + local_state: &'a ThreadLocalState, +} + +impl Runner<'_> { + /// Creates a runner and registers it in the executor state. + fn new(state: &State) -> Runner<'_> { + let local_state = THREAD_LOCAL_STATE.get_or_default(); + let runner = Runner { + state, + ticker: Ticker::new(state), + ticks: 0, + local_state, + }; + state + .stealer_queues + .write() + .unwrap_or_else(PoisonError::into_inner) + .push(&local_state.stealable_queue); + runner + } + + /// Waits for the next runnable task to run. + fn runnable(&mut self, _rng: &mut fastrand::Rng) -> impl Future { + // SAFETY: The provided search function does not access LOCAL_QUEUE in any way, and thus cannot + // alias. + let runnable = unsafe { + self + .ticker + .runnable_with(|tls| { + if let Some(r) = tls.local_queue.pop_back() { + return Some(r); + } + + crate::cfg::multi_threaded! { + // Try the local queue. + if let Ok(r) = self.local_state.stealable_queue.pop() { + return Some(r); + } + + // Try stealing from the global queue. + if let Ok(r) = self.state.queue.pop() { + steal(&self.state.queue, &self.local_state.stealable_queue); + return Some(r); + } + + // Try stealing from other runners. + if let Ok(stealer_queues) = self.state.stealer_queues.try_read() { + // Pick a random starting point in the iterator list and rotate the list. + let n = stealer_queues.len(); + let start = _rng.usize(..n); + let iter = stealer_queues + .iter() + .chain(stealer_queues.iter()) + .skip(start) + .take(n); + + // Remove this runner's local queue. + let iter = + iter.filter(|local| !core::ptr::eq(**local, &self.local_state.stealable_queue)); + + // Try stealing from each local queue in the list. + for local in iter { + steal(*local, &self.local_state.stealable_queue); + if let Ok(r) = self.local_state.stealable_queue.pop() { + return Some(r); + } + } + } + + if let Ok(r) = self.local_state.thread_locked_queue.pop() { + // Do not steal from this queue. If other threads steal + // from this current thread, the task will be moved. + // + // Instead, flush all queued tasks into the local queue to + // minimize the effort required to scan for these tasks. + flush_to_local(&self.local_state.thread_locked_queue, tls); + return Some(r); + } + } + + None + }) + }; + + // Bump the tick counter. + self.ticks = self.ticks.wrapping_add(1); + + if self.ticks.is_multiple_of(64) { + // Steal tasks from the global queue to ensure fair task scheduling. + steal(&self.state.queue, &self.local_state.stealable_queue); + } + + runnable + } +} + +impl Drop for Runner<'_> { + fn drop(&mut self) { + // Remove the local queue. + { + let mut stealer_queues = self.state.stealer_queues.write().unwrap(); + if let Some((idx, _)) = stealer_queues + .iter() + .enumerate() + .rev() + .find(|(_, local)| core::ptr::eq(**local, &self.local_state.stealable_queue)) + { + stealer_queues.remove(idx); + } + } + + // Re-schedule remaining tasks in the local queue. + while let Ok(r) = self.local_state.stealable_queue.pop() { + r.schedule(); + } + } +} + +/// Steals some items from one queue into another. +fn steal(src: &ConcurrentQueue, dest: &ConcurrentQueue) { + // Half of `src`'s length rounded up. + let mut count = src.len(); + + if count > 0 { + if let Some(capacity) = dest.capacity() { + // Don't steal more than fits into the queue. + count = count.min(capacity- dest.len()); + } + + // Steal tasks. + for _ in 0..count { + let Ok(val) = src.pop() else { break }; + assert!(dest.push(val).is_ok()); + } + } +} + +fn flush_to_local(src: &ConcurrentQueue, dst: &mut LocalQueue) { + let count = src.len(); + + if count > 0 { + // Steal tasks. + for _ in 0..count { + let Ok(val) = src.pop() else { break }; + dst.local_queue.push_front(val); + } + } +} + +/// Debug implementation for `Executor`. +fn debug_executor(executor: &Executor, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_state(&executor.state, name, f) +} + +/// Debug implementation for `Executor`. +fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { + /// Debug wrapper for the number of active tasks. + struct ActiveTasks<'a>(&'a Mutex>); + + impl fmt::Debug for ActiveTasks<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.try_lock() { + Ok(lock) => fmt::Debug::fmt(&lock.len(), f), + Err(TryLockError::WouldBlock) => f.write_str(""), + Err(TryLockError::Poisoned(err)) => fmt::Debug::fmt(&err.into_inner().len(), f), + } + } + } + + /// Debug wrapper for the local runners. + struct LocalRunners<'a>(&'a RwLock>>); + + impl fmt::Debug for LocalRunners<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.try_read() { + Ok(lock) => f + .debug_list() + .entries(lock.iter().map(|queue| queue.len())) + .finish(), + Err(TryLockError::WouldBlock) => f.write_str(""), + Err(TryLockError::Poisoned(_)) => f.write_str(""), + } + } + } + + /// Debug wrapper for the sleepers. + struct SleepCount<'a>(&'a Mutex); + + impl fmt::Debug for SleepCount<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.try_lock() { + Ok(lock) => fmt::Debug::fmt(&lock.count, f), + Err(TryLockError::WouldBlock) => f.write_str(""), + Err(TryLockError::Poisoned(_)) => f.write_str(""), + } + } + } + + f.debug_struct(name) + .field("global_tasks", &state.queue.len()) + .field("stealer_queues", &LocalRunners(&state.stealer_queues)) + .field("sleepers", &SleepCount(&state.sleepers)) + .finish() +} + +struct AbortOnPanic; + +impl Drop for AbortOnPanic { + fn drop(&mut self) { + // Panicking while unwinding will force an abort. + panic!("Aborting due to allocator error"); + } +} + +/// Runs a closure when dropped. +struct CallOnDrop(F); + +impl Drop for CallOnDrop { + fn drop(&mut self) { + (self.0)(); + } +} + +pin_project_lite::pin_project! { + /// A wrapper around a future, running a closure when dropped. + struct AsyncCallOnDrop { + #[pin] + future: Fut, + cleanup: CallOnDrop, + } +} + +impl AsyncCallOnDrop { + fn new(future: Fut, cleanup: Cleanup) -> Self { + Self { + future, + cleanup: CallOnDrop(cleanup), + } + } +} + +impl Future for AsyncCallOnDrop { + type Output = Fut::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().future.poll(cx) + } +} + +#[cfg(test)] +mod test { + use super::*; + use super::THREAD_LOCAL_STATE; + use alloc::{string::String, boxed::Box}; + use futures_lite::{future, pin}; + use async_task::Task; + use core::time::Duration; + + static EX: Executor = Executor::new(); + + fn _ensure_send_and_sync() { + fn is_send(_: T) {} + fn is_sync(_: T) {} + fn is_static(_: T) {} + + is_send::(Executor::new()); + is_sync::(Executor::new()); + + is_send(EX.schedule()); + is_sync(EX.schedule()); + is_static(EX.schedule()); + is_send(EX.current_thread_spawner()); + is_sync(EX.current_thread_spawner()); + is_send(THREAD_LOCAL_STATE.get_or_default()); + is_sync(THREAD_LOCAL_STATE.get_or_default()); + } + + #[test] + fn await_task_after_dropping_executor() { + let s: String = "hello".into(); + + // SAFETY: We make sure that the task does not outlive the borrow on `s`. + let task: Task<&str> = unsafe { EX.spawn_scoped(async { &*s }) }; + future::block_on(EX.run(async { + for _ in 0..10 { + future::yield_now().await; + } + })); + + assert_eq!(future::block_on(task), "hello"); + drop(s); + } + + fn do_run>(mut f: impl FnMut(&'static Executor) -> Fut) { + // This should not run for longer than two minutes. + #[cfg(not(miri))] + let _stop_timeout = { + let (stop_timeout, stopper) = async_channel::bounded::<()>(1); + std::thread::spawn(move || { + future::block_on(async move { + #[expect(clippy::print_stderr, reason = "Explicitly used to warn about timed out tests")] + let timeout = async { + async_io::Timer::after(Duration::from_secs(2 * 60)).await; + std::eprintln!("test timed out after 2m"); + std::process::exit(1) + }; + + let _ = stopper.recv().or(timeout).await; + }); + }); + stop_timeout + }; + + // Test 1: Use the `run` command. + future::block_on(EX.run(f(&EX))); + + // Test 2: Run on many threads. + std::thread::scope(|scope| { + let (_signal, shutdown) = async_channel::bounded::<()>(1); + + for _ in 0..16 { + let shutdown = shutdown.clone(); + let ex = &EX; + scope.spawn(move || future::block_on(ex.run(shutdown.recv()))); + } + + future::block_on(f(&EX)); + }); + } + + #[test] + fn smoke() { + do_run(|ex| async move { ex.spawn(async {}).await }); + } + + #[test] + fn yield_now() { + do_run(|ex| async move { ex.spawn(future::yield_now()).await }); + } + + #[test] + fn timer() { + do_run(|ex| async move { + ex.spawn(async_io::Timer::after(Duration::from_millis(5))) + .await; + }); + } + + #[test] + fn test_panic_propagation() { + let task = EX.spawn(async { panic!("should be caught by the task") }); + + // Running the executor should not panic. + future::block_on(EX.run(async { + for _ in 0..10 { + future::yield_now().await; + } + })); + + // Polling the task should. + assert!(future::block_on(task.catch_unwind()).is_err()); + } + + #[test] + fn two_queues() { + future::block_on(async { + // Create an executor with two runners. + let (run1, run2) = ( + EX.run(future::pending::<()>()), + EX.run(future::pending::<()>()), + ); + let mut run1 = Box::pin(run1); + pin!(run2); + + // Poll them both. + assert!(future::poll_once(run1.as_mut()).await.is_none()); + assert!(future::poll_once(run2.as_mut()).await.is_none()); + + // Drop the first one, which should leave the local queue in the `None` state. + drop(run1); + assert!(future::poll_once(run2.as_mut()).await.is_none()); + }); + } +} diff --git a/crates/bevy_tasks/src/edge_executor.rs b/crates/bevy_tasks/src/edge_executor.rs index a8c80725cafe9..55206bee856d6 100644 --- a/crates/bevy_tasks/src/edge_executor.rs +++ b/crates/bevy_tasks/src/edge_executor.rs @@ -1,8 +1,7 @@ -//! Alternative to `async_executor` based on [`edge_executor`] by Ivan Markov. +//! Alternative to `bevy_executor` based on [`edge_executor`] by Ivan Markov. //! //! It has been vendored along with its tests to update several outdated dependencies. //! -//! [`async_executor`]: https://github.com/smol-rs/async-executor //! [`edge_executor`]: https://github.com/ivmarkov/edge-executor #![expect(unsafe_code, reason = "original implementation relies on unsafe")] @@ -13,16 +12,14 @@ // TODO: Create a more tailored replacement, possibly integrating [Fotre](https://github.com/NthTensor/Forte) -use alloc::rc::Rc; use core::{ future::{poll_fn, Future}, - marker::PhantomData, task::{Context, Poll}, }; use async_task::{Runnable, Task}; use atomic_waker::AtomicWaker; -use bevy_platform::sync::{Arc, LazyLock}; +use bevy_platform::sync::LazyLock; use futures_lite::FutureExt; /// An async executor. @@ -49,12 +46,11 @@ use futures_lite::FutureExt; /// drop(signal); /// })); /// ``` -pub struct Executor<'a, const C: usize = 64> { - state: LazyLock>>, - _invariant: PhantomData>, +pub struct Executor { + state: LazyLock>, } -impl<'a, const C: usize> Executor<'a, C> { +impl Executor { /// Creates a new executor. /// /// # Examples @@ -66,8 +62,7 @@ impl<'a, const C: usize> Executor<'a, C> { /// ``` pub const fn new() -> Self { Self { - state: LazyLock::new(|| Arc::new(State::new())), - _invariant: PhantomData, + state: LazyLock::new(|| State::new()), } } @@ -88,7 +83,16 @@ impl<'a, const C: usize> Executor<'a, C> { /// Note that if the executor's queue size is equal to the number of currently /// spawned and running tasks, spawning this additional task might cause the executor to panic /// later, when the task is scheduled for polling. - pub fn spawn(&self, fut: F) -> Task + pub fn spawn(&'static self, fut: F) -> Task + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + // SAFETY: Original implementation missing safety documentation + unsafe { self.spawn_unchecked(fut) } + } + + pub unsafe fn spawn_scoped<'a, F>(&'static self, fut: F) -> Task where F: Future + Send + 'a, F::Output: Send + 'a, @@ -97,6 +101,24 @@ impl<'a, const C: usize> Executor<'a, C> { unsafe { self.spawn_unchecked(fut) } } + pub fn spawn_local(&'static self, fut: F) -> Task + where + F: Future + 'static, + F::Output: 'static, + { + // SAFETY: Original implementation missing safety documentation + unsafe { self.spawn_unchecked(fut) } + } + + pub unsafe fn spawn_local_scoped<'a, F>(&'static self, fut: F) -> Task + where + F: Future + 'a, + F::Output: 'a, + { + // SAFETY: Original implementation missing safety documentation + unsafe { self.spawn_unchecked(fut) } + } + /// Attempts to run a task if at least one is scheduled. /// /// Running a scheduled task means simply polling its future once. @@ -160,9 +182,9 @@ impl<'a, const C: usize> Executor<'a, C> { /// /// assert_eq!(res, 6); /// ``` - pub async fn run(&self, fut: F) -> F::Output + pub async fn run<'a, F>(&'static self, fut: F) -> F::Output where - F: Future + Send + 'a, + F: Future + 'a, { // SAFETY: Original implementation missing safety documentation unsafe { self.run_unchecked(fut).await } @@ -175,7 +197,7 @@ impl<'a, const C: usize> Executor<'a, C> { /// Polls the first task scheduled for execution by the executor. fn poll_runnable(&self, ctx: &Context<'_>) -> Poll { - self.state().waker.register(ctx.waker()); + self.state.waker.register(ctx.waker()); if let Some(runnable) = self.try_runnable() { Poll::Ready(runnable) @@ -201,7 +223,7 @@ impl<'a, const C: usize> Executor<'a, C> { target_has_atomic = "ptr" ))] { - runnable = self.state().queue.pop(); + runnable = self.state.queue.pop().ok(); } #[cfg(not(all( @@ -212,7 +234,7 @@ impl<'a, const C: usize> Executor<'a, C> { target_has_atomic = "ptr" )))] { - runnable = self.state().queue.dequeue(); + runnable = self.state.queue.dequeue(); } runnable @@ -221,12 +243,12 @@ impl<'a, const C: usize> Executor<'a, C> { /// # Safety /// /// Original implementation missing safety documentation - unsafe fn spawn_unchecked(&self, fut: F) -> Task + unsafe fn spawn_unchecked(&'static self, fut: F) -> Task where F: Future, { let schedule = { - let state = self.state().clone(); + let state = &self.state; move |runnable| { #[cfg(all( @@ -280,158 +302,18 @@ impl<'a, const C: usize> Executor<'a, C> { run_forever.or(fut).await } - - /// Returns a reference to the inner state. - fn state(&self) -> &Arc> { - &self.state - } } -impl<'a, const C: usize> Default for Executor<'a, C> { +impl Default for Executor { fn default() -> Self { Self::new() } } // SAFETY: Original implementation missing safety documentation -unsafe impl<'a, const C: usize> Send for Executor<'a, C> {} +unsafe impl Send for Executor {} // SAFETY: Original implementation missing safety documentation -unsafe impl<'a, const C: usize> Sync for Executor<'a, C> {} - -/// A thread-local executor. -/// -/// The executor can only be run on the thread that created it. -/// -/// # Examples -/// -/// ```ignore -/// use edge_executor::{LocalExecutor, block_on}; -/// -/// let local_ex: LocalExecutor = Default::default(); -/// -/// block_on(local_ex.run(async { -/// println!("Hello world!"); -/// })); -/// ``` -pub struct LocalExecutor<'a, const C: usize = 64> { - executor: Executor<'a, C>, - _not_send: PhantomData>>, -} - -impl<'a, const C: usize> LocalExecutor<'a, C> { - /// Creates a single-threaded executor. - /// - /// # Examples - /// - /// ```ignore - /// use edge_executor::LocalExecutor; - /// - /// let local_ex: LocalExecutor = Default::default(); - /// ``` - pub const fn new() -> Self { - Self { - executor: Executor::::new(), - _not_send: PhantomData, - } - } - - /// Spawns a task onto the executor. - /// - /// # Examples - /// - /// ```ignore - /// use edge_executor::LocalExecutor; - /// - /// let local_ex: LocalExecutor = Default::default(); - /// - /// let task = local_ex.spawn(async { - /// println!("Hello world"); - /// }); - /// ``` - /// - /// Note that if the executor's queue size is equal to the number of currently - /// spawned and running tasks, spawning this additional task might cause the executor to panic - /// later, when the task is scheduled for polling. - pub fn spawn(&self, fut: F) -> Task - where - F: Future + 'a, - F::Output: 'a, - { - // SAFETY: Original implementation missing safety documentation - unsafe { self.executor.spawn_unchecked(fut) } - } - - /// Attempts to run a task if at least one is scheduled. - /// - /// Running a scheduled task means simply polling its future once. - /// - /// # Examples - /// - /// ```ignore - /// use edge_executor::LocalExecutor; - /// - /// let local_ex: LocalExecutor = Default::default(); - /// assert!(!local_ex.try_tick()); // no tasks to run - /// - /// let task = local_ex.spawn(async { - /// println!("Hello world"); - /// }); - /// assert!(local_ex.try_tick()); // a task was found - /// ``` - pub fn try_tick(&self) -> bool { - self.executor.try_tick() - } - - /// Runs a single task asynchronously. - /// - /// Running a task means simply polling its future once. - /// - /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. - /// - /// # Examples - /// - /// ```ignore - /// use edge_executor::{LocalExecutor, block_on}; - /// - /// let local_ex: LocalExecutor = Default::default(); - /// - /// let task = local_ex.spawn(async { - /// println!("Hello world"); - /// }); - /// block_on(local_ex.tick()); // runs the task - /// ``` - pub async fn tick(&self) { - self.executor.tick().await; - } - - /// Runs the executor asynchronously until the given future completes. - /// - /// # Examples - /// - /// ```ignore - /// use edge_executor::{LocalExecutor, block_on}; - /// - /// let local_ex: LocalExecutor = Default::default(); - /// - /// let task = local_ex.spawn(async { 1 + 2 }); - /// let res = block_on(local_ex.run(async { task.await * 2 })); - /// - /// assert_eq!(res, 6); - /// ``` - pub async fn run(&self, fut: F) -> F::Output - where - F: Future, - { - // SAFETY: Original implementation missing safety documentation - unsafe { self.executor.run_unchecked(fut) }.await - } -} - -impl<'a, const C: usize> Default for LocalExecutor<'a, C> { - fn default() -> Self { - Self::new() - } -} +unsafe impl Sync for Executor {} struct State { #[cfg(all( @@ -441,7 +323,7 @@ struct State { target_has_atomic = "64", target_has_atomic = "ptr" ))] - queue: crossbeam_queue::ArrayQueue, + queue: concurrent_queue::ConcurrentQueue, #[cfg(not(all( target_has_atomic = "8", target_has_atomic = "16", @@ -463,7 +345,7 @@ impl State { target_has_atomic = "64", target_has_atomic = "ptr" ))] - queue: crossbeam_queue::ArrayQueue::new(C), + queue: concurrent_queue::ConcurrentQueue::bounded(C), #[cfg(not(all( target_has_atomic = "8", target_has_atomic = "16", @@ -477,46 +359,6 @@ impl State { } } -#[cfg(test)] -mod different_executor_tests { - use core::cell::Cell; - - use bevy_tasks::{block_on, futures_lite::{pending, poll_once}}; - use futures_lite::pin; - - use super::LocalExecutor; - - #[test] - fn shared_queue_slot() { - block_on(async { - let was_polled = Cell::new(false); - let future = async { - was_polled.set(true); - pending::<()>().await; - }; - - let ex1: LocalExecutor = Default::default(); - let ex2: LocalExecutor = Default::default(); - - // Start the futures for running forever. - let (run1, run2) = (ex1.run(pending::<()>()), ex2.run(pending::<()>())); - pin!(run1); - pin!(run2); - assert!(poll_once(run1.as_mut()).await.is_none()); - assert!(poll_once(run2.as_mut()).await.is_none()); - - // Spawn the future on executor one and then poll executor two. - ex1.spawn(future).detach(); - assert!(poll_once(run2).await.is_none()); - assert!(!was_polled.get()); - - // Poll the first one. - assert!(poll_once(run1).await.is_none()); - assert!(was_polled.get()); - }); - } -} - #[cfg(test)] mod drop_tests { use alloc::string::String; @@ -533,7 +375,7 @@ mod drop_tests { #[test] fn leaked_executor_leaks_everything() { static DROP: AtomicUsize = AtomicUsize::new(0); - static WAKER: LazyLock>> = LazyLock::new(Default::default); + static WAKER: Mutex> = Mutex::new(None); let ex: Executor = Default::default(); diff --git a/crates/bevy_tasks/src/executor.rs b/crates/bevy_tasks/src/executor.rs deleted file mode 100644 index fcce0e2985536..0000000000000 --- a/crates/bevy_tasks/src/executor.rs +++ /dev/null @@ -1,86 +0,0 @@ -//! Provides a fundamental executor primitive appropriate for the target platform -//! and feature set selected. -//! By default, the `async_executor` feature will be enabled, which will rely on -//! [`async-executor`] for the underlying implementation. This requires `std`, -//! so is not suitable for `no_std` contexts. Instead, you must use `edge_executor`, -//! which relies on the alternate [`edge-executor`] backend. -//! -//! [`async-executor`]: https://crates.io/crates/async-executor -//! [`edge-executor`]: https://crates.io/crates/edge-executor - -use core::{ - fmt, - panic::{RefUnwindSafe, UnwindSafe}, -}; -use derive_more::{Deref, DerefMut}; - -crate::cfg::async_executor! { - if { - type ExecutorInner<'a> = async_executor::Executor<'a>; - type LocalExecutorInner<'a> = async_executor::LocalExecutor<'a>; - } else { - type ExecutorInner<'a> = crate::edge_executor::Executor<'a, 64>; - type LocalExecutorInner<'a> = crate::edge_executor::LocalExecutor<'a, 64>; - } -} - -crate::cfg::multi_threaded! { - pub use async_task::FallibleTask; -} - -/// Wrapper around a multi-threading-aware async executor. -/// Spawning will generally require tasks to be `Send` and `Sync` to allow multiple -/// threads to send/receive/advance tasks. -/// -/// If you require an executor _without_ the `Send` and `Sync` requirements, consider -/// using [`LocalExecutor`] instead. -#[derive(Deref, DerefMut, Default)] -pub struct Executor<'a>(ExecutorInner<'a>); - -/// Wrapper around a single-threaded async executor. -/// Spawning wont generally require tasks to be `Send` and `Sync`, at the cost of -/// this executor itself not being `Send` or `Sync`. This makes it unsuitable for -/// global statics. -/// -/// If need to store an executor in a global static, or send across threads, -/// consider using [`Executor`] instead. -#[derive(Deref, DerefMut, Default)] -pub struct LocalExecutor<'a>(LocalExecutorInner<'a>); - -impl Executor<'_> { - /// Construct a new [`Executor`] - #[expect(clippy::allow_attributes, reason = "This lint may not always trigger.")] - #[allow(dead_code, reason = "not all feature flags require this function")] - pub const fn new() -> Self { - Self(ExecutorInner::new()) - } -} - -impl LocalExecutor<'_> { - /// Construct a new [`LocalExecutor`] - #[expect(clippy::allow_attributes, reason = "This lint may not always trigger.")] - #[allow(dead_code, reason = "not all feature flags require this function")] - pub const fn new() -> Self { - Self(LocalExecutorInner::new()) - } -} - -impl UnwindSafe for Executor<'_> {} - -impl RefUnwindSafe for Executor<'_> {} - -impl UnwindSafe for LocalExecutor<'_> {} - -impl RefUnwindSafe for LocalExecutor<'_> {} - -impl fmt::Debug for Executor<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Executor").finish() - } -} - -impl fmt::Debug for LocalExecutor<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("LocalExecutor").finish() - } -} diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 7ec4f3748c692..cc0ad2d504370 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -13,9 +13,9 @@ pub mod cfg { pub use bevy_platform::cfg::{alloc, std, web}; define_alias! { - #[cfg(feature = "async_executor")] => { - /// Indicates `async_executor` is used as the future execution backend. - async_executor + #[cfg(feature = "bevy_executor")] => { + /// Indicates `bevy_executor` is used as the future execution backend. + bevy_executor } #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] => { @@ -72,15 +72,16 @@ use alloc::boxed::Box; pub type BoxedFuture<'a, T> = core::pin::Pin + 'a>>; // Modules -mod executor; pub mod futures; mod iter; mod slice; mod task; mod usages; -cfg::async_executor! { - if {} else { +cfg::bevy_executor! { + if { + mod bevy_executor; + } else { mod edge_executor; } } @@ -94,23 +95,15 @@ pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; pub use futures_lite; pub use futures_lite::future::poll_once; -cfg::web! { - if {} else { - pub use usages::tick_global_task_pools_on_main_thread; - } -} - cfg::multi_threaded! { if { mod task_pool; - mod thread_executor; - pub use task_pool::{Scope, TaskPool, TaskPoolBuilder}; - pub use thread_executor::{ThreadExecutor, ThreadExecutorTicker}; + pub use task_pool::{Scope, TaskPool, TaskPoolBuilder, LocalTaskSpawner}; } else { mod single_threaded_task_pool; - pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder, ThreadExecutor}; + pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder, LocalTaskSpawner}; } } diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index d81e43b4e91b9..1079079f67197 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -1,28 +1,19 @@ -use alloc::{string::String, vec::Vec}; -use bevy_platform::sync::Arc; +use alloc::{string::String, vec::Vec, fmt}; use core::{cell::{RefCell, Cell}, future::Future, marker::PhantomData, mem}; +use crate::futures::now_or_never; -use crate::executor::LocalExecutor; use crate::{block_on, Task}; -crate::cfg::std! { - if { - use std::thread_local; - - use crate::executor::LocalExecutor as Executor; - - thread_local! { - static LOCAL_EXECUTOR: Executor<'static> = const { Executor::new() }; - } +crate::cfg::bevy_executor! { + if { + use crate::bevy_executor::Executor; } else { - - // Because we do not have thread-locals without std, we cannot use LocalExecutor here. - use crate::executor::Executor; - - static LOCAL_EXECUTOR: Executor<'static> = const { Executor::new() }; + use crate::edge_executor::Executor; } } +static EXECUTOR: Executor = const { Executor::new() }; + /// Used to create a [`TaskPool`]. #[derive(Debug, Default, Clone)] pub struct TaskPoolBuilder {} @@ -31,15 +22,9 @@ pub struct TaskPoolBuilder {} /// task pool. In the case of the multithreaded task pool this struct is used to spawn /// tasks on a specific thread. But the wasm task pool just calls /// `wasm_bindgen_futures::spawn_local` for spawning which just runs tasks on the main thread -/// and so the [`ThreadExecutor`] does nothing. -#[derive(Default)] -pub struct ThreadExecutor<'a>(PhantomData<&'a ()>); -impl<'a> ThreadExecutor<'a> { - /// Creates a new `ThreadExecutor` - pub fn new() -> Self { - Self::default() - } -} +/// and so the [`LocalTaskSpawner`] does nothing. +#[derive(Clone)] +pub struct LocalTaskSpawner; impl TaskPoolBuilder { /// Creates a new `TaskPoolBuilder` instance @@ -76,6 +61,10 @@ impl TaskPoolBuilder { pub fn build(self) -> TaskPool { TaskPool::new_internal() } + + pub(crate) fn build_static(self, _executor: &'static Executor) -> TaskPool { + TaskPool::new_internal() + } } /// A thread pool for executing tasks. Tasks are futures that are being automatically driven by @@ -85,8 +74,8 @@ pub struct TaskPool {} impl TaskPool { /// Just create a new `ThreadExecutor` for wasm - pub fn get_thread_executor() -> Arc> { - Arc::new(ThreadExecutor::new()) + pub fn current_thread_spawner(&self) -> LocalTaskSpawner { + LocalTaskSpawner } /// Create a `TaskPool` with the default configuration. @@ -113,7 +102,7 @@ impl TaskPool { F: for<'scope> FnOnce(&'scope mut Scope<'scope, 'env, T>), T: Send + 'static, { - self.scope_with_executor(false, None, f) + self.scope_with_executor(None, f) } /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback, @@ -124,8 +113,7 @@ impl TaskPool { #[expect(unsafe_code, reason = "Required to transmute lifetimes.")] pub fn scope_with_executor<'env, F, T>( &self, - _tick_task_pool_executor: bool, - _thread_executor: Option<&ThreadExecutor>, + _thread_executor: Option, f: F, ) -> Vec where @@ -140,22 +128,19 @@ impl TaskPool { // Any usages of the references passed into `Scope` must be accessed through // the transmuted reference for the rest of this function. - let executor = LocalExecutor::new(); + // Kept around to ensure that, in the case of an unwinding panic, all scheduled Tasks are cancelled. + let tasks: RefCell>> = RefCell::new(Vec::new()); // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let executor_ref: &'env LocalExecutor<'env> = unsafe { mem::transmute(&executor) }; - - let results: RefCell>> = RefCell::new(Vec::new()); - // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let results_ref: &'env RefCell>> = unsafe { mem::transmute(&results) }; + let tasks_ref: &'env RefCell>> = unsafe { mem::transmute(&tasks) }; let pending_tasks: Cell = Cell::new(0); // SAFETY: As above, all futures must complete in this function so we can change the lifetime let pending_tasks: &'env Cell = unsafe { mem::transmute(&pending_tasks) }; let mut scope = Scope { - executor_ref, + executor_ref: &EXECUTOR, + tasks_ref, pending_tasks, - results_ref, scope: PhantomData, env: PhantomData, }; @@ -166,16 +151,17 @@ impl TaskPool { f(scope_ref); // Wait until the scope is complete - block_on(executor.run(async { + block_on(EXECUTOR.run(async { while pending_tasks.get() != 0 { futures_lite::future::yield_now().await; } })); - results + tasks .take() .into_iter() - .map(|result| result.unwrap()) + .map(now_or_never) + .map(Option::unwrap) .collect() } @@ -195,20 +181,17 @@ impl TaskPool { crate::cfg::switch! {{ crate::cfg::web => { Task::wrap_future(future) - } - crate::cfg::std => { - LOCAL_EXECUTOR.with(|executor| { - let task = executor.spawn(future); - // Loop until all tasks are done - while executor.try_tick() {} - - Task::new(task) - }) - } + } _ => { - let task = LOCAL_EXECUTOR.spawn(future); + let task = EXECUTOR.spawn_local(future); // Loop until all tasks are done - while LOCAL_EXECUTOR.try_tick() {} + crate::cfg::bevy_executor! { + if { + while !Executor::try_tick_local() {} + } else { + while EXECUTOR.try_tick() {} + } + } Task::new(task) } @@ -225,43 +208,17 @@ impl TaskPool { { self.spawn(future) } - - /// Runs a function with the local executor. Typically used to tick - /// the local executor on the main thread as it needs to share time with - /// other things. - /// - /// ``` - /// use bevy_tasks::TaskPool; - /// - /// TaskPool::new().with_local_executor(|local_executor| { - /// local_executor.try_tick(); - /// }); - /// ``` - pub fn with_local_executor(&self, f: F) -> R - where - F: FnOnce(&Executor) -> R, - { - crate::cfg::switch! {{ - crate::cfg::std => { - LOCAL_EXECUTOR.with(f) - } - _ => { - f(&LOCAL_EXECUTOR) - } - }} - } } /// A `TaskPool` scope for running one or more non-`'static` futures. /// /// For more information, see [`TaskPool::scope`]. -#[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { - executor_ref: &'scope LocalExecutor<'scope>, + executor_ref: &'static Executor, // The number of pending tasks spawned on the scope pending_tasks: &'scope Cell, // Vector to gather results of all futures spawned during scope run - results_ref: &'env RefCell>>, + tasks_ref: &'env RefCell>>, // make `Scope` invariant over 'scope and 'env scope: PhantomData<&'scope mut &'scope ()>, @@ -301,28 +258,33 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { let pending_tasks = self.pending_tasks; pending_tasks.update(|i| i + 1); - // add a spot to keep the result, and record the index - let results_ref = self.results_ref; - let mut results = results_ref.borrow_mut(); - let task_number = results.len(); - results.push(None); - drop(results); - // create the job closure let f = async move { let result = f.await; - // store the result in the allocated slot - let mut results = results_ref.borrow_mut(); - results[task_number] = Some(result); - drop(results); - // decrement the pending tasks count pending_tasks.update(|i| i - 1); + + result }; - // spawn the job itself - self.executor_ref.spawn(f).detach(); + let mut tasks = self.tasks_ref.borrow_mut(); + + #[expect(unsafe_code, reason = "Executor::spawn_local_scoped is unsafe")] + // SAFETY: The surrounding scope will not terminate until all local tasks are done + // ensuring that the borrowed variables do not outlive the detached task. + tasks.push(unsafe { self.executor_ref.spawn_local_scoped(f) }); + } +} + +impl <'scope, 'env: 'scope, T> fmt::Debug for Scope<'scope, 'env, T> +where T: fmt::Debug +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Scope") + .field("pending_tasks", &self.pending_tasks) + .field("tasks_ref", &self.tasks_ref) + .finish() } } diff --git a/crates/bevy_tasks/src/task.rs b/crates/bevy_tasks/src/task.rs index e70ab1d18b3c8..73b82b80660c9 100644 --- a/crates/bevy_tasks/src/task.rs +++ b/crates/bevy_tasks/src/task.rs @@ -38,7 +38,9 @@ cfg::web! { spawn_local(async move { // Catch any panics that occur when polling the future so they can // be propagated back to the task handle. - let value = CatchUnwind(AssertUnwindSafe(future)).await; + let value = CatchUnwind { + inner: AssertUnwindSafe(future) + }.await; let _ = sender.send(value).await; }); Self(receiver) @@ -173,13 +175,17 @@ cfg::web! { type Panic = Box; - #[pin_project::pin_project] - struct CatchUnwind(#[pin] F); + pin_project_lite::pin_project! { + struct CatchUnwind { + #[pin] + inner: F + } + } impl Future for CatchUnwind { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let f = AssertUnwindSafe(|| self.project().0.poll(cx)); + let f = AssertUnwindSafe(|| self.project().inner.poll(cx)); let result = cfg::std! { if { diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index eb6f8502b3d80..902ac1c379a37 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -1,20 +1,16 @@ use alloc::{boxed::Box, format, string::String, vec::Vec}; use core::{future::Future, marker::PhantomData, mem, panic::AssertUnwindSafe}; -use std::{ - thread::{self, JoinHandle}, - thread_local, -}; +use std::thread::{self, JoinHandle}; -use crate::executor::FallibleTask; +use crate::bevy_executor::Executor; +use async_task::FallibleTask; use bevy_platform::sync::Arc; use concurrent_queue::ConcurrentQueue; use futures_lite::FutureExt; -use crate::{ - block_on, - thread_executor::{ThreadExecutor, ThreadExecutorTicker}, - Task, -}; +use crate::{block_on, Task}; + +pub use crate::bevy_executor::LocalTaskSpawner; struct CallOnDrop(Option>); @@ -117,7 +113,11 @@ impl TaskPoolBuilder { /// Creates a new [`TaskPool`] based on the current options. pub fn build(self) -> TaskPool { - TaskPool::new_internal(self) + TaskPool::new_internal(self, Box::leak(Box::new(Executor::new()))) + } + + pub(crate) fn build_static(self, executor: &'static Executor) -> TaskPool { + TaskPool::new_internal(self, executor) } } @@ -134,7 +134,7 @@ impl TaskPoolBuilder { #[derive(Debug)] pub struct TaskPool { /// The executor for the pool. - executor: Arc>, + executor: &'static Executor, // The inner state of the pool. threads: Vec>, @@ -142,14 +142,10 @@ pub struct TaskPool { } impl TaskPool { - thread_local! { - static LOCAL_EXECUTOR: crate::executor::LocalExecutor<'static> = const { crate::executor::LocalExecutor::new() }; - static THREAD_EXECUTOR: Arc> = Arc::new(ThreadExecutor::new()); - } - - /// Each thread should only create one `ThreadExecutor`, otherwise, there are good chances they will deadlock - pub fn get_thread_executor() -> Arc> { - Self::THREAD_EXECUTOR.with(Clone::clone) + /// Creates a [`LocalTaskSpawner`] for this current thread of execution. + /// Can be used to spawn new tasks to execute exclusively on this thread. + pub fn current_thread_spawner(&self) -> LocalTaskSpawner { + self.executor.current_thread_spawner() } /// Create a `TaskPool` with the default configuration. @@ -157,18 +153,15 @@ impl TaskPool { TaskPoolBuilder::new().build() } - fn new_internal(builder: TaskPoolBuilder) -> Self { + fn new_internal(builder: TaskPoolBuilder, executor: &'static Executor) -> Self { let (shutdown_tx, shutdown_rx) = async_channel::unbounded::<()>(); - let executor = Arc::new(crate::executor::Executor::new()); - let num_threads = builder .num_threads .unwrap_or_else(crate::available_parallelism); let threads = (0..num_threads) .map(|i| { - let ex = Arc::clone(&executor); let shutdown_rx = shutdown_rx.clone(); let thread_name = if let Some(thread_name) = builder.thread_name.as_deref() { @@ -187,28 +180,22 @@ impl TaskPool { thread_builder .spawn(move || { - TaskPool::LOCAL_EXECUTOR.with(|local_executor| { - if let Some(on_thread_spawn) = on_thread_spawn { - on_thread_spawn(); - drop(on_thread_spawn); - } - let _destructor = CallOnDrop(on_thread_destroy); - loop { - let res = std::panic::catch_unwind(|| { - let tick_forever = async move { - loop { - local_executor.tick().await; - } - }; - block_on(ex.run(tick_forever.or(shutdown_rx.recv()))) - }); - if let Ok(value) = res { - // Use unwrap_err because we expect a Closed error - value.unwrap_err(); - break; - } + crate::bevy_executor::install_runtime_into_current_thread(executor); + + if let Some(on_thread_spawn) = on_thread_spawn { + on_thread_spawn(); + drop(on_thread_spawn); + } + let _destructor = CallOnDrop(on_thread_destroy); + loop { + let res = + std::panic::catch_unwind(|| block_on(executor.run(shutdown_rx.recv()))); + if let Ok(value) = res { + // Use unwrap_err because we expect a Closed error + value.unwrap_err(); + break; } - }); + } }) .expect("Failed to spawn thread.") }) @@ -312,57 +299,39 @@ impl TaskPool { F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>), T: Send + 'static, { - Self::THREAD_EXECUTOR.with(|scope_executor| { - self.scope_with_executor_inner(true, scope_executor, scope_executor, f) - }) + let scope_spawner = self.current_thread_spawner(); + self.scope_with_executor_inner(scope_spawner.clone(), scope_spawner, f) } - /// This allows passing an external executor to spawn tasks on. When you pass an external executor - /// [`Scope::spawn_on_scope`] spawns is then run on the thread that [`ThreadExecutor`] is being ticked on. - /// If [`None`] is passed the scope will use a [`ThreadExecutor`] that is ticked on the current thread. - /// - /// When `tick_task_pool_executor` is set to `true`, the multithreaded task stealing executor is ticked on the scope - /// thread. Disabling this can be useful when finishing the scope is latency sensitive. Pulling tasks from - /// global executor can run tasks unrelated to the scope and delay when the scope returns. + /// This allows passing an external [`LocalTaskSpawner`] to spawn tasks to. When you pass an external spawner + /// [`Scope::spawn_on_scope`] spawns is then run on the thread that [`LocalTaskSpawner`] originated from. + /// If [`None`] is passed the scope will use a [`LocalTaskSpawner`] that is ticked on the current thread. /// /// See [`Self::scope`] for more details in general about how scopes work. pub fn scope_with_executor<'env, F, T>( &self, - tick_task_pool_executor: bool, - external_executor: Option<&ThreadExecutor>, + external_spawner: Option, f: F, ) -> Vec where F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>), T: Send + 'static, { - Self::THREAD_EXECUTOR.with(|scope_executor| { - // If an `external_executor` is passed, use that. Otherwise, get the executor stored - // in the `THREAD_EXECUTOR` thread local. - if let Some(external_executor) = external_executor { - self.scope_with_executor_inner( - tick_task_pool_executor, - external_executor, - scope_executor, - f, - ) - } else { - self.scope_with_executor_inner( - tick_task_pool_executor, - scope_executor, - scope_executor, - f, - ) - } - }) + let scope_spawner = self.executor.current_thread_spawner(); + // If an `external_executor` is passed, use that. Otherwise, get the executor stored + // in the `THREAD_EXECUTOR` thread local. + if let Some(external_spawner) = external_spawner { + self.scope_with_executor_inner(external_spawner, scope_spawner, f) + } else { + self.scope_with_executor_inner(scope_spawner.clone(), scope_spawner, f) + } } #[expect(unsafe_code, reason = "Required to transmute lifetimes.")] fn scope_with_executor_inner<'env, F, T>( &self, - tick_task_pool_executor: bool, - external_executor: &ThreadExecutor, - scope_executor: &ThreadExecutor, + external_spawner: LocalTaskSpawner, + scope_spawner: LocalTaskSpawner, f: F, ) -> Vec where @@ -376,26 +345,17 @@ impl TaskPool { // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety. // Any usages of the references passed into `Scope` must be accessed through // the transmuted reference for the rest of this function. - let executor: &crate::executor::Executor = &self.executor; - // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let executor: &'env crate::executor::Executor = unsafe { mem::transmute(executor) }; - // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let external_executor: &'env ThreadExecutor<'env> = - unsafe { mem::transmute(external_executor) }; - // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let scope_executor: &'env ThreadExecutor<'env> = unsafe { mem::transmute(scope_executor) }; let spawned: ConcurrentQueue>>> = ConcurrentQueue::unbounded(); // shadow the variable so that the owned value cannot be used for the rest of the function // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let spawned: &'env ConcurrentQueue< - FallibleTask>>, - > = unsafe { mem::transmute(&spawned) }; + let spawned: &'env ConcurrentQueue>>> = + unsafe { mem::transmute(&spawned) }; let scope = Scope { - executor, - external_executor, - scope_executor, + executor: self.executor, + external_spawner, + scope_spawner, spawned, scope: PhantomData, env: PhantomData, @@ -410,144 +370,20 @@ impl TaskPool { if spawned.is_empty() { Vec::new() } else { - block_on(async move { - let get_results = async { - let mut results = Vec::with_capacity(spawned.len()); - while let Ok(task) = spawned.pop() { - if let Some(res) = task.await { - match res { - Ok(res) => results.push(res), - Err(payload) => std::panic::resume_unwind(payload), - } - } else { - panic!("Failed to catch panic!"); - } - } - results - }; - - let tick_task_pool_executor = tick_task_pool_executor || self.threads.is_empty(); - - // we get this from a thread local so we should always be on the scope executors thread. - // note: it is possible `scope_executor` and `external_executor` is the same executor, - // in that case, we should only tick one of them, otherwise, it may cause deadlock. - let scope_ticker = scope_executor.ticker().unwrap(); - let external_ticker = if !external_executor.is_same(scope_executor) { - external_executor.ticker() - } else { - None - }; - - match (external_ticker, tick_task_pool_executor) { - (Some(external_ticker), true) => { - Self::execute_global_external_scope( - executor, - external_ticker, - scope_ticker, - get_results, - ) - .await - } - (Some(external_ticker), false) => { - Self::execute_external_scope(external_ticker, scope_ticker, get_results) - .await + block_on(self.executor.run(async move { + let mut results = Vec::with_capacity(spawned.len()); + while let Ok(task) = spawned.pop() { + match task.await { + Some(Ok(res)) => results.push(res), + Some(Err(payload)) => std::panic::resume_unwind(payload), + None => panic!("Failed to catch panic!"), } - // either external_executor is none or it is same as scope_executor - (None, true) => { - Self::execute_global_scope(executor, scope_ticker, get_results).await - } - (None, false) => Self::execute_scope(scope_ticker, get_results).await, } - }) + results + })) } } - #[inline] - async fn execute_global_external_scope<'scope, 'ticker, T>( - executor: &'scope crate::executor::Executor<'scope>, - external_ticker: ThreadExecutorTicker<'scope, 'ticker>, - scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, - get_results: impl Future>, - ) -> Vec { - // we restart the executors if a task errors. if a scoped - // task errors it will panic the scope on the call to get_results - let execute_forever = async move { - loop { - let tick_forever = async { - loop { - external_ticker.tick().or(scope_ticker.tick()).await; - } - }; - // we don't care if it errors. If a scoped task errors it will propagate - // to get_results - let _result = AssertUnwindSafe(executor.run(tick_forever)) - .catch_unwind() - .await - .is_ok(); - } - }; - get_results.or(execute_forever).await - } - - #[inline] - async fn execute_external_scope<'scope, 'ticker, T>( - external_ticker: ThreadExecutorTicker<'scope, 'ticker>, - scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, - get_results: impl Future>, - ) -> Vec { - let execute_forever = async { - loop { - let tick_forever = async { - loop { - external_ticker.tick().or(scope_ticker.tick()).await; - } - }; - let _result = AssertUnwindSafe(tick_forever).catch_unwind().await.is_ok(); - } - }; - get_results.or(execute_forever).await - } - - #[inline] - async fn execute_global_scope<'scope, 'ticker, T>( - executor: &'scope crate::executor::Executor<'scope>, - scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, - get_results: impl Future>, - ) -> Vec { - let execute_forever = async { - loop { - let tick_forever = async { - loop { - scope_ticker.tick().await; - } - }; - let _result = AssertUnwindSafe(executor.run(tick_forever)) - .catch_unwind() - .await - .is_ok(); - } - }; - get_results.or(execute_forever).await - } - - #[inline] - async fn execute_scope<'scope, 'ticker, T>( - scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, - get_results: impl Future>, - ) -> Vec { - let execute_forever = async { - loop { - let tick_forever = async { - loop { - scope_ticker.tick().await; - } - }; - let _result = AssertUnwindSafe(tick_forever).catch_unwind().await.is_ok(); - } - }; - get_results.or(execute_forever).await - } - /// Spawns a static future onto the thread pool. The returned [`Task`] is a /// future that can be polled for the result. It can also be canceled and /// "detached", allowing the task to continue running even if dropped. In @@ -578,25 +414,7 @@ impl TaskPool { where T: 'static, { - Task::new(TaskPool::LOCAL_EXECUTOR.with(|executor| executor.spawn(future))) - } - - /// Runs a function with the local executor. Typically used to tick - /// the local executor on the main thread as it needs to share time with - /// other things. - /// - /// ``` - /// use bevy_tasks::TaskPool; - /// - /// TaskPool::new().with_local_executor(|local_executor| { - /// local_executor.try_tick(); - /// }); - /// ``` - pub fn with_local_executor(&self, f: F) -> R - where - F: FnOnce(&crate::executor::LocalExecutor) -> R, - { - Self::LOCAL_EXECUTOR.with(f) + Task::new(self.executor.spawn_local(future)) } } @@ -625,9 +443,9 @@ impl Drop for TaskPool { /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { - executor: &'scope crate::executor::Executor<'scope>, - external_executor: &'scope ThreadExecutor<'scope>, - scope_executor: &'scope ThreadExecutor<'scope>, + executor: &'static Executor, + external_spawner: LocalTaskSpawner, + scope_spawner: LocalTaskSpawner, spawned: &'scope ConcurrentQueue>>>, // make `Scope` invariant over 'scope and 'env scope: PhantomData<&'scope mut &'scope ()>, @@ -635,6 +453,10 @@ pub struct Scope<'scope, 'env: 'scope, T> { } impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { + #[expect( + unsafe_code, + reason = "Executor::spawn otherwise requires 'static Futures" + )] /// Spawns a scoped future onto the thread pool. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of /// [`TaskPool::scope`]'s return value. @@ -644,15 +466,21 @@ impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { /// /// For more information, see [`TaskPool::scope`]. pub fn spawn + 'scope + Send>(&self, f: Fut) { - let task = self - .executor - .spawn(AssertUnwindSafe(f).catch_unwind()) - .fallible(); - // ConcurrentQueue only errors when closed or full, but we never - // close and use an unbounded queue, so it is safe to unwrap - self.spawned.push(task).unwrap(); + // SAFETY: The scope call that generated this `Scope` ensures that the created + // Task does not outlive 'scope. + let task = unsafe { + self.executor + .spawn_scoped(AssertUnwindSafe(f).catch_unwind()) + .fallible() + }; + let result = self.spawned.push(task); + debug_assert!(result.is_ok()); } + #[expect( + unsafe_code, + reason = "LocalTaskSpawner::spawn otherwise requires 'static Futures" + )] /// Spawns a scoped future onto the thread the scope is run on. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of /// [`TaskPool::scope`]'s return value. Users should generally prefer to use @@ -660,15 +488,21 @@ impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { /// /// For more information, see [`TaskPool::scope`]. pub fn spawn_on_scope + 'scope + Send>(&self, f: Fut) { - let task = self - .scope_executor - .spawn(AssertUnwindSafe(f).catch_unwind()) - .fallible(); - // ConcurrentQueue only errors when closed or full, but we never - // close and use an unbounded queue, so it is safe to unwrap - self.spawned.push(task).unwrap(); + // SAFETY: The scope call that generated this `Scope` ensures that the created + // Task does not outlive 'scope. + let task = unsafe { + self.scope_spawner + .spawn_scoped(AssertUnwindSafe(f).catch_unwind()) + .fallible() + }; + let result = self.spawned.push(task); + debug_assert!(result.is_ok()); } + #[expect( + unsafe_code, + reason = "LocalTaskSpawner::spawn otherwise requires 'static Futures" + )] /// Spawns a scoped future onto the thread of the external thread executor. /// This is typically the main thread. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of @@ -677,13 +511,17 @@ impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { /// /// For more information, see [`TaskPool::scope`]. pub fn spawn_on_external + 'scope + Send>(&self, f: Fut) { - let task = self - .external_executor - .spawn(AssertUnwindSafe(f).catch_unwind()) - .fallible(); + // SAFETY: The scope call that generated this `Scope` ensures that the created + // Task does not outlive 'scope. + let task = unsafe { + self.external_spawner + .spawn_scoped(AssertUnwindSafe(f).catch_unwind()) + .fallible() + }; // ConcurrentQueue only errors when closed or full, but we never - // close and use an unbounded queue, so it is safe to unwrap - self.spawned.push(task).unwrap(); + // close and use an unbounded queue, so pushing should always succeed. + let result = self.spawned.push(task); + debug_assert!(result.is_ok()); } } @@ -740,6 +578,7 @@ mod tests { #[test] fn test_thread_callbacks() { let counter = Arc::new(AtomicI32::new(0)); + static EX: Executor = Executor::new(); let start_counter = counter.clone(); { let barrier = Arc::new(Barrier::new(11)); @@ -751,7 +590,7 @@ mod tests { start_counter.fetch_add(1, Ordering::Relaxed); barrier.clone().wait(); }) - .build(); + .build_static(&EX); last_barrier.wait(); assert_eq!(10, counter.load(Ordering::Relaxed)); } @@ -763,7 +602,7 @@ mod tests { .on_thread_destroy(move || { end_counter.fetch_sub(1, Ordering::Relaxed); }) - .build(); + .build_static(&EX); assert_eq!(10, counter.load(Ordering::Relaxed)); } assert_eq!(-10, counter.load(Ordering::Relaxed)); @@ -781,7 +620,7 @@ mod tests { .on_thread_destroy(move || { end_counter.fetch_sub(1, Ordering::Relaxed); }) - .build(); + .build_static(&EX); last_barrier.wait(); assert_eq!(-5, counter.load(Ordering::Relaxed)); } diff --git a/crates/bevy_tasks/src/thread_executor.rs b/crates/bevy_tasks/src/thread_executor.rs deleted file mode 100644 index 86d2ab280d87c..0000000000000 --- a/crates/bevy_tasks/src/thread_executor.rs +++ /dev/null @@ -1,133 +0,0 @@ -use core::marker::PhantomData; -use std::thread::{self, ThreadId}; - -use crate::executor::Executor; -use async_task::Task; -use futures_lite::Future; - -/// An executor that can only be ticked on the thread it was instantiated on. But -/// can spawn `Send` tasks from other threads. -/// -/// # Example -/// ``` -/// # use std::sync::{Arc, atomic::{AtomicI32, Ordering}}; -/// use bevy_tasks::ThreadExecutor; -/// -/// let thread_executor = ThreadExecutor::new(); -/// let count = Arc::new(AtomicI32::new(0)); -/// -/// // create some owned values that can be moved into another thread -/// let count_clone = count.clone(); -/// -/// std::thread::scope(|scope| { -/// scope.spawn(|| { -/// // we cannot get the ticker from another thread -/// let not_thread_ticker = thread_executor.ticker(); -/// assert!(not_thread_ticker.is_none()); -/// -/// // but we can spawn tasks from another thread -/// thread_executor.spawn(async move { -/// count_clone.fetch_add(1, Ordering::Relaxed); -/// }).detach(); -/// }); -/// }); -/// -/// // the tasks do not make progress unless the executor is manually ticked -/// assert_eq!(count.load(Ordering::Relaxed), 0); -/// -/// // tick the ticker until task finishes -/// let thread_ticker = thread_executor.ticker().unwrap(); -/// thread_ticker.try_tick(); -/// assert_eq!(count.load(Ordering::Relaxed), 1); -/// ``` -#[derive(Debug)] -pub struct ThreadExecutor<'task> { - executor: Executor<'task>, - thread_id: ThreadId, -} - -impl<'task> Default for ThreadExecutor<'task> { - fn default() -> Self { - Self { - executor: Executor::new(), - thread_id: thread::current().id(), - } - } -} - -impl<'task> ThreadExecutor<'task> { - /// create a new [`ThreadExecutor`] - pub fn new() -> Self { - Self::default() - } - - /// Spawn a task on the thread executor - pub fn spawn( - &self, - future: impl Future + Send + 'task, - ) -> Task { - self.executor.spawn(future) - } - - /// Gets the [`ThreadExecutorTicker`] for this executor. - /// Use this to tick the executor. - /// It only returns the ticker if it's on the thread the executor was created on - /// and returns `None` otherwise. - pub fn ticker<'ticker>(&'ticker self) -> Option> { - if thread::current().id() == self.thread_id { - return Some(ThreadExecutorTicker { - executor: self, - _marker: PhantomData, - }); - } - None - } - - /// Returns true if `self` and `other`'s executor is same - pub fn is_same(&self, other: &Self) -> bool { - core::ptr::eq(self, other) - } -} - -/// Used to tick the [`ThreadExecutor`]. The executor does not -/// make progress unless it is manually ticked on the thread it was -/// created on. -#[derive(Debug)] -pub struct ThreadExecutorTicker<'task, 'ticker> { - executor: &'ticker ThreadExecutor<'task>, - // make type not send or sync - _marker: PhantomData<*const ()>, -} - -impl<'task, 'ticker> ThreadExecutorTicker<'task, 'ticker> { - /// Tick the thread executor. - pub async fn tick(&self) { - self.executor.executor.tick().await; - } - - /// Synchronously try to tick a task on the executor. - /// Returns false if does not find a task to tick. - pub fn try_tick(&self) -> bool { - self.executor.executor.try_tick() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use alloc::sync::Arc; - - #[test] - fn test_ticker() { - let executor = Arc::new(ThreadExecutor::new()); - let ticker = executor.ticker(); - assert!(ticker.is_some()); - - thread::scope(|s| { - s.spawn(|| { - let ticker = executor.ticker(); - assert!(ticker.is_none()); - }); - }); - } -} diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs index 40cabd76ac5e6..6905b8c538fea 100644 --- a/crates/bevy_tasks/src/usages.rs +++ b/crates/bevy_tasks/src/usages.rs @@ -1,9 +1,18 @@ -use super::TaskPool; +use super::{TaskPool, TaskPoolBuilder}; use bevy_platform::sync::OnceLock; use core::ops::Deref; +crate::cfg::bevy_executor! { + if { + use crate::bevy_executor::Executor; + } else { + use crate::edge_executor::Executor; + } +} + macro_rules! taskpool { - ($(#[$attr:meta])* ($static:ident, $type:ident)) => { + ($(#[$attr:meta])* ($static:ident, $executor:ident, $type:ident)) => { + static $executor: Executor = Executor::new(); static $static: OnceLock<$type> = OnceLock::new(); $(#[$attr])* @@ -12,8 +21,8 @@ macro_rules! taskpool { impl $type { #[doc = concat!(" Gets the global [`", stringify!($type), "`] instance, or initializes it with `f`.")] - pub fn get_or_init(f: impl FnOnce() -> TaskPool) -> &'static Self { - $static.get_or_init(|| Self(f())) + pub fn get_or_init(f: impl FnOnce() -> TaskPoolBuilder) -> &'static Self { + $static.get_or_init(|| Self(f().build_static(&$executor))) } #[doc = concat!(" Attempts to get the global [`", stringify!($type), "`] instance, \ @@ -56,7 +65,7 @@ taskpool! { /// See [`TaskPool`] documentation for details on Bevy tasks. /// [`AsyncComputeTaskPool`] should be preferred if the work does not have to be /// completed before the next frame. - (COMPUTE_TASK_POOL, ComputeTaskPool) + (COMPUTE_TASK_POOL, COMPUTE_EXECUTOR, ComputeTaskPool) } taskpool! { @@ -64,7 +73,7 @@ taskpool! { /// /// See [`TaskPool`] documentation for details on Bevy tasks. /// Use [`ComputeTaskPool`] if the work must be complete before advancing to the next frame. - (ASYNC_COMPUTE_TASK_POOL, AsyncComputeTaskPool) + (ASYNC_COMPUTE_TASK_POOL, ASYNC_COMPUTE_EXECUTOR, AsyncComputeTaskPool) } taskpool! { @@ -72,38 +81,5 @@ taskpool! { /// "woken" state) /// /// See [`TaskPool`] documentation for details on Bevy tasks. - (IO_TASK_POOL, IoTaskPool) -} - -crate::cfg::web! { - if {} else { - /// A function used by `bevy_app` to tick the global tasks pools on the main thread. - /// This will run a maximum of 100 local tasks per executor per call to this function. - /// - /// # Warning - /// - /// This function *must* be called on the main thread, or the task pools will not be updated appropriately. - pub fn tick_global_task_pools_on_main_thread() { - COMPUTE_TASK_POOL - .get() - .unwrap() - .with_local_executor(|compute_local_executor| { - ASYNC_COMPUTE_TASK_POOL - .get() - .unwrap() - .with_local_executor(|async_local_executor| { - IO_TASK_POOL - .get() - .unwrap() - .with_local_executor(|io_local_executor| { - for _ in 0..100 { - compute_local_executor.try_tick(); - async_local_executor.try_tick(); - io_local_executor.try_tick(); - } - }); - }); - }); - } - } + (IO_TASK_POOL, IO_EXECUTOR, IoTaskPool) } diff --git a/crates/bevy_transform/Cargo.toml b/crates/bevy_transform/Cargo.toml index 42173c0f1776b..d593532e1963d 100644 --- a/crates/bevy_transform/Cargo.toml +++ b/crates/bevy_transform/Cargo.toml @@ -33,7 +33,7 @@ approx = "0.5.1" [features] # Turning off default features leaves you with a barebones # definition of transform. -default = ["std", "bevy-support", "bevy_reflect", "async_executor"] +default = ["std", "bevy-support", "bevy_reflect"] # Functionality @@ -55,12 +55,6 @@ bevy_reflect = [ "bevy_app/bevy_reflect", ] -# Executor Backend - -## Uses `async-executor` as a task execution backend. -## This backend is incompatible with `no_std` targets. -async_executor = ["std", "bevy_tasks/async_executor"] - # Platform Compatibility ## Allows access to the `std` crate. Enabling this feature will prevent compilation diff --git a/crates/bevy_transform/src/systems.rs b/crates/bevy_transform/src/systems.rs index 62038b37ed90f..2e9a113e66d8a 100644 --- a/crates/bevy_transform/src/systems.rs +++ b/crates/bevy_transform/src/systems.rs @@ -252,7 +252,7 @@ mod parallel { // TODO: this implementation could be used in no_std if there are equivalents of these. use alloc::{sync::Arc, vec::Vec}; use bevy_ecs::{entity::UniqueEntityIter, prelude::*, system::lifetimeless::Read}; - use bevy_tasks::{ComputeTaskPool, TaskPool}; + use bevy_tasks::{ComputeTaskPool, TaskPoolBuilder}; use bevy_utils::Parallel; use core::sync::atomic::{AtomicI32, Ordering}; use std::sync::{ @@ -320,7 +320,7 @@ mod parallel { } // Spawn workers on the task pool to recursively propagate the hierarchy in parallel. - let task_pool = ComputeTaskPool::get_or_init(TaskPool::default); + let task_pool = ComputeTaskPool::get_or_init(TaskPoolBuilder::default); task_pool.scope(|s| { (1..task_pool.thread_num()) // First worker is run locally instead of the task pool. .for_each(|_| s.spawn(async { propagation_worker(&queue, &nodes) })); @@ -559,13 +559,13 @@ mod test { use bevy_app::prelude::*; use bevy_ecs::{prelude::*, world::CommandQueue}; use bevy_math::{vec3, Vec3}; - use bevy_tasks::{ComputeTaskPool, TaskPool}; + use bevy_tasks::{ComputeTaskPool, TaskPoolBuilder}; use crate::systems::*; #[test] fn correct_parent_removed() { - ComputeTaskPool::get_or_init(TaskPool::default); + ComputeTaskPool::get_or_init(TaskPoolBuilder::default); let mut world = World::default(); let offset_global_transform = |offset| GlobalTransform::from(Transform::from_xyz(offset, offset, offset)); @@ -626,7 +626,7 @@ mod test { #[test] fn did_propagate() { - ComputeTaskPool::get_or_init(TaskPool::default); + ComputeTaskPool::get_or_init(TaskPoolBuilder::default); let mut world = World::default(); let mut schedule = Schedule::default(); @@ -702,7 +702,7 @@ mod test { #[test] fn correct_children() { - ComputeTaskPool::get_or_init(TaskPool::default); + ComputeTaskPool::get_or_init(TaskPoolBuilder::default); let mut world = World::default(); let mut schedule = Schedule::default(); @@ -783,7 +783,7 @@ mod test { #[test] fn correct_transforms_when_no_children() { let mut app = App::new(); - ComputeTaskPool::get_or_init(TaskPool::default); + ComputeTaskPool::get_or_init(TaskPoolBuilder::default); app.add_systems( Update, @@ -834,7 +834,7 @@ mod test { #[test] #[should_panic] fn panic_when_hierarchy_cycle() { - ComputeTaskPool::get_or_init(TaskPool::default); + ComputeTaskPool::get_or_init(TaskPoolBuilder::default); // We cannot directly edit ChildOf and Children, so we use a temp world to break the // hierarchy's invariants. let mut temp = World::new(); diff --git a/crates/bevy_winit/src/state.rs b/crates/bevy_winit/src/state.rs index 0eef31fb96640..94d0d8889064b 100644 --- a/crates/bevy_winit/src/state.rs +++ b/crates/bevy_winit/src/state.rs @@ -15,8 +15,6 @@ use bevy_input::{ use bevy_log::{trace, warn}; use bevy_math::{ivec2, DVec2, Vec2}; use bevy_platform::time::Instant; -#[cfg(not(target_arch = "wasm32"))] -use bevy_tasks::tick_global_task_pools_on_main_thread; use core::marker::PhantomData; #[cfg(target_arch = "wasm32")] use winit::platform::web::EventLoopExtWebSys; @@ -153,10 +151,7 @@ impl ApplicationHandler for WinitAppRunnerState { let _span = tracing::info_span!("winit event_handler").entered(); if self.app.plugins_state() != PluginsState::Cleaned { - if self.app.plugins_state() != PluginsState::Ready { - #[cfg(not(target_arch = "wasm32"))] - tick_global_task_pools_on_main_thread(); - } else { + if self.app.plugins_state() == PluginsState::Ready { self.app.finish(); self.app.cleanup(); } diff --git a/docs/cargo_features.md b/docs/cargo_features.md index 2f4bbd95e5c0b..8cd55bb514a39 100644 --- a/docs/cargo_features.md +++ b/docs/cargo_features.md @@ -14,7 +14,6 @@ The default feature set enables most of the expected features of a game engine, |android-game-activity|Android GameActivity support. Default, choose between this and `android-native-activity`.| |android_shared_stdcxx|Enable using a shared stdlib for cxx on Android| |animation|Enable animation support, and glTF animation loading| -|async_executor|Uses `async-executor` as a task execution backend.| |bevy_animation|Provides animation functionality| |bevy_anti_alias|Provides various anti aliasing solutions| |bevy_asset|Provides asset functionality|