From d8b0988938f8af12293deecdfab591f98b3758ae Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 18 Aug 2025 12:16:36 -0700 Subject: [PATCH 1/2] Relax the `Send` bound on fiber creation This commit removes the need for `Send` for all usages of fiber-related bits. The goal is that the future returned from `async` functions is `Send` if all the inputs are `Send`, but there's no actual need to require that in the function signatures themselves. This property was identified in upcoming work to make more internals of Wasmtime `async` where `async` functions are going to be used to implement the synchronous path through Wasmtime where `Send` isn't a bound today, nor ideally do we want to have it there. The way this commit works is: * First `make_fiber` now has a `Send` bound which makes it sound. Before #11444 it took `&mut dyn VMStore` which is never `Send` and the refactoring there missed adding this bound. This change required a few `Send` bounds in `concurrent.rs` where it's expected to have no impact as everything there is basically already `Send`. * Next `make_fiber_unchecked` is added which is the same as `make_fiber` except without a `Send` bound. The `on_fiber` function is then updated to use this `*_unchecked` variant. This means that fibers can now be used with non-`Send` stores. Crucially though the structure of internals in play means that future produced is still only `Send` if `T: Send` meaning that there is no loss in safety. * Next some `Send` bounds were dropped throughout async functions in Wasmtime as a proof-of-concept to require these changes to `on_fiber`. This means that these entrypoints into Wasmtime no longer require `Send`, but still naturally require `Send` if the result future is sent to various threads. * Finally a new doctest is added to `Instance::new_async` with a `compile_fail` example to ensure that this fails. This is unfortunately a very brittle test because all we can assert is that compilation failed, not why compilation failed. That means that this is likely to regress over time, but it's the best we can do at this time. Note that the goal here is NOT to remove `Send` bounds throughout Wasmtime. Many of them are still required, for example all the ones related to `Linker`. The realization is that we can remove some of the "edge" bounds on entrypoints where the resulting future can be conditionally `Send` depending on `T` meaning we don't actually have to write down any bounds ourselves. This refactoring will enable future refactorings to have sync-and-async functions use the same internals and neither entrypoint needs to have a `Send` bound. --- crates/wasmtime/Cargo.toml | 2 +- .../src/runtime/component/concurrent.rs | 15 +++-- .../wasmtime/src/runtime/externals/table.rs | 4 +- crates/wasmtime/src/runtime/fiber.rs | 37 ++++++++-- crates/wasmtime/src/runtime/instance.rs | 67 ++++++++++++++++--- crates/wasmtime/src/runtime/memory.rs | 11 +-- crates/wasmtime/src/runtime/store/async_.rs | 5 +- 7 files changed, 107 insertions(+), 34 deletions(-) diff --git a/crates/wasmtime/Cargo.toml b/crates/wasmtime/Cargo.toml index 5295f487bb84..37f87cc21511 100644 --- a/crates/wasmtime/Cargo.toml +++ b/crates/wasmtime/Cargo.toml @@ -103,7 +103,7 @@ tempfile = { workspace = true } libtest-mimic = { workspace = true } cranelift-native = { workspace = true } wasmtime-test-util = { workspace = true } -tokio = { workspace = true, features = ["macros", "sync"] } +tokio = { workspace = true, features = ["macros", "sync", "rt-multi-thread"] } [build-dependencies] cc = { workspace = true, optional = true } diff --git a/crates/wasmtime/src/runtime/component/concurrent.rs b/crates/wasmtime/src/runtime/component/concurrent.rs index 6ef00bd880ca..b34ba9ca10bf 100644 --- a/crates/wasmtime/src/runtime/component/concurrent.rs +++ b/crates/wasmtime/src/runtime/component/concurrent.rs @@ -1075,7 +1075,7 @@ impl Instance { fun: impl AsyncFnOnce(&Accessor) -> R, ) -> Result where - T: 'static, + T: Send + 'static, { check_recursive_run(); let mut store = store.as_context_mut(); @@ -1165,7 +1165,10 @@ impl Instance { self, mut store: StoreContextMut<'_, T>, mut future: Pin<&mut impl Future>, - ) -> Result { + ) -> Result + where + T: Send, + { loop { // Take `ConcurrentState::futures` out of the instance so we can // poll it while also safely giving any of the futures inside access @@ -1320,7 +1323,7 @@ impl Instance { } /// Handle the specified work item, possibly resuming a fiber if applicable. - async fn handle_work_item( + async fn handle_work_item( self, store: StoreContextMut<'_, T>, item: WorkItem, @@ -1438,7 +1441,11 @@ impl Instance { } /// Execute the specified guest call on a worker fiber. - async fn run_on_worker(self, store: StoreContextMut<'_, T>, item: WorkerItem) -> Result<()> { + async fn run_on_worker( + self, + store: StoreContextMut<'_, T>, + item: WorkerItem, + ) -> Result<()> { let worker = if let Some(fiber) = self.concurrent_state_mut(store.0).worker.take() { fiber } else { diff --git a/crates/wasmtime/src/runtime/externals/table.rs b/crates/wasmtime/src/runtime/externals/table.rs index a8cfcb5ec7a5..30a8ec5615e6 100644 --- a/crates/wasmtime/src/runtime/externals/table.rs +++ b/crates/wasmtime/src/runtime/externals/table.rs @@ -107,7 +107,7 @@ impl Table { /// [`Store`](`crate::Store`) #[cfg(feature = "async")] pub async fn new_async( - mut store: impl AsContextMut, + mut store: impl AsContextMut, ty: TableType, init: Ref, ) -> Result { @@ -350,7 +350,7 @@ impl Table { #[cfg(feature = "async")] pub async fn grow_async( &self, - mut store: impl AsContextMut, + mut store: impl AsContextMut, delta: u64, init: Ref, ) -> Result { diff --git a/crates/wasmtime/src/runtime/fiber.rs b/crates/wasmtime/src/runtime/fiber.rs index 188c8100386f..9b181119fc48 100644 --- a/crates/wasmtime/src/runtime/fiber.rs +++ b/crates/wasmtime/src/runtime/fiber.rs @@ -769,7 +769,13 @@ fn resume_fiber<'a>( } /// Create a new `StoreFiber` which runs the specified closure. -pub(crate) fn make_fiber<'a, S>( +/// +/// # Safety +/// +/// The returned `StoreFiber<'a>` structure is unconditionally `Send` but the +/// send-ness is actually a function of `S`. When `S` is statically known to be +/// `Send` then use the safe [`make_fiber`] function. +pub(crate) unsafe fn make_fiber_unchecked<'a, S>( store: &mut S, fun: impl FnOnce(&mut S) -> Result<()> + Send + Sync + 'a, ) -> Result> @@ -852,6 +858,19 @@ where }) } +/// Safe wrapper around [`make_fiber_unchecked`] which requires that `S` is +/// `Send`. +#[cfg(feature = "component-model-async")] +pub(crate) fn make_fiber<'a, S>( + store: &mut S, + fun: impl FnOnce(&mut S) -> Result<()> + Send + Sync + 'a, +) -> Result> +where + S: AsStoreOpaque + Send + ?Sized + 'a, +{ + unsafe { make_fiber_unchecked(store, fun) } +} + /// Run the specified function on a newly-created fiber and `.await` its /// completion. pub(crate) async fn on_fiber( @@ -868,10 +887,18 @@ where debug_assert!(config.async_stack_size > 0); let mut result = None; - let fiber = make_fiber(store, |store| { - result = Some(func(store)); - Ok(()) - })?; + + // SAFETY: the `StoreFiber` returned by `make_fiber_unchecked` is `Send` + // despite we not actually knowing here whether `S` is `Send` or not. That + // is safe here, however, because this function is already conditionally + // `Send` based on `S`. Additionally `fiber` doesn't escape this function, + // so the future-of-this-function is still correctly `Send`-vs-not. + let fiber = unsafe { + make_fiber_unchecked(store, |store| { + result = Some(func(store)); + Ok(()) + })? + }; { let fiber = FiberFuture { diff --git a/crates/wasmtime/src/runtime/instance.rs b/crates/wasmtime/src/runtime/instance.rs index 75b6ec50a5ce..200a79aa4e10 100644 --- a/crates/wasmtime/src/runtime/instance.rs +++ b/crates/wasmtime/src/runtime/instance.rs @@ -137,9 +137,60 @@ impl Instance { /// /// This function will also panic, like [`Instance::new`], if any [`Extern`] /// specified does not belong to `store`. + /// + /// # Examples + /// + /// An example of using this function: + /// + /// ``` + /// use wasmtime::{Result, Store, Engine, Config, Module, Instance}; + /// + /// #[tokio::main] + /// async fn main() -> Result<()> { + /// let mut config = Config::new(); + /// config.async_support(true); + /// let engine = Engine::new(&config)?; + /// + /// // For this example, a module with no imports is being used hence + /// // the empty array to `Instance::new_async`. + /// let module = Module::new(&engine, "(module)")?; + /// let mut store = Store::new(&engine, ()); + /// let instance = Instance::new_async(&mut store, &module, &[]).await?; + /// + /// // ... use `instance` and exports and such ... + /// + /// Ok(()) + /// } + /// ``` + /// + /// Note, though, that the future returned from this function is only + /// `Send` if the store's own data is `Send` meaning that this does not + /// compile for example: + /// + /// ```compile_fail + /// use wasmtime::{Result, Store, Engine, Config, Module, Instance}; + /// use std::rc::Rc; + /// + /// #[tokio::main] + /// async fn main() -> Result<()> { + /// let mut config = Config::new(); + /// config.async_support(true); + /// let engine = Engine::new(&config)?; + /// + /// let module = Module::new(&engine, "(module)")?; + /// let mut store = Store::new(&engine, Rc::new(())); + /// + /// // Compile failure because `Store>` is not `Send` + /// assert_send(Instance::new_async(&mut store, &module, &[])).await?; + /// + /// Ok(()) + /// } + /// + /// fn assert_send(t: T) -> T { t } + /// ``` #[cfg(feature = "async")] pub async fn new_async( - mut store: impl AsContextMut, + mut store: impl AsContextMut, module: &Module, imports: &[Extern], ) -> Result { @@ -229,10 +280,7 @@ impl Instance { store: &mut StoreContextMut<'_, T>, module: &Module, imports: Imports<'_>, - ) -> Result - where - T: Send + 'static, - { + ) -> Result { assert!( store.0.async_support(), "must use sync instantiation when async support is disabled", @@ -850,14 +898,15 @@ impl InstancePre { /// /// Panics if any import closed over by this [`InstancePre`] isn't owned by /// `store`, or if `store` does not have async support enabled. + /// + /// ```compile_fail + /// what + /// ``` #[cfg(feature = "async")] pub async fn instantiate_async( &self, mut store: impl AsContextMut, - ) -> Result - where - T: Send, - { + ) -> Result { let mut store = store.as_context_mut(); let imports = pre_instantiate_raw( &mut store.0, diff --git a/crates/wasmtime/src/runtime/memory.rs b/crates/wasmtime/src/runtime/memory.rs index 50e79d91a823..6524c10c9b7e 100644 --- a/crates/wasmtime/src/runtime/memory.rs +++ b/crates/wasmtime/src/runtime/memory.rs @@ -271,10 +271,7 @@ impl Memory { /// This function will panic when used with a non-async /// [`Store`](`crate::Store`). #[cfg(feature = "async")] - pub async fn new_async( - mut store: impl AsContextMut, - ty: MemoryType, - ) -> Result { + pub async fn new_async(mut store: impl AsContextMut, ty: MemoryType) -> Result { let mut store = store.as_context_mut(); assert!( store.0.async_support(), @@ -623,11 +620,7 @@ impl Memory { /// This function will panic when used with a non-async /// [`Store`](`crate::Store`). #[cfg(feature = "async")] - pub async fn grow_async( - &self, - mut store: impl AsContextMut, - delta: u64, - ) -> Result { + pub async fn grow_async(&self, mut store: impl AsContextMut, delta: u64) -> Result { let mut store = store.as_context_mut(); assert!( store.0.async_support(), diff --git a/crates/wasmtime/src/runtime/store/async_.rs b/crates/wasmtime/src/runtime/store/async_.rs index b326d6e1c174..bb2d8f93bfb5 100644 --- a/crates/wasmtime/src/runtime/store/async_.rs +++ b/crates/wasmtime/src/runtime/store/async_.rs @@ -278,10 +278,7 @@ impl StoreContextMut<'_, T> { pub(crate) async fn on_fiber( &mut self, func: impl FnOnce(&mut StoreContextMut<'_, T>) -> R + Send + Sync, - ) -> Result - where - T: Send + 'static, - { + ) -> Result { fiber::on_fiber(self.0, |me| func(&mut StoreContextMut(me))).await } } From 5a00996c1e53cf40067fe9a0ea2f74b9e0509f6b Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 18 Aug 2025 22:08:45 -0700 Subject: [PATCH 2/2] Review comments --- crates/wasmtime/src/runtime/instance.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/wasmtime/src/runtime/instance.rs b/crates/wasmtime/src/runtime/instance.rs index 200a79aa4e10..938129d34966 100644 --- a/crates/wasmtime/src/runtime/instance.rs +++ b/crates/wasmtime/src/runtime/instance.rs @@ -178,6 +178,9 @@ impl Instance { /// let engine = Engine::new(&config)?; /// /// let module = Module::new(&engine, "(module)")?; + /// + /// // Note that `Rc<()>` is NOT `Send`, which is what many future + /// // runtimes require and below will cause a failure. /// let mut store = Store::new(&engine, Rc::new(())); /// /// // Compile failure because `Store>` is not `Send` @@ -898,10 +901,6 @@ impl InstancePre { /// /// Panics if any import closed over by this [`InstancePre`] isn't owned by /// `store`, or if `store` does not have async support enabled. - /// - /// ```compile_fail - /// what - /// ``` #[cfg(feature = "async")] pub async fn instantiate_async( &self,