From fc848e52db782788af5ccb9c8ddc6ce1d0bf148b Mon Sep 17 00:00:00 2001 From: Matthew Leach Date: Tue, 11 Feb 2025 20:19:12 +0000 Subject: [PATCH 1/5] reactor: uring: create type for pending_io slab entries Any IO that is pending is put into the `pending_io` slab. Currently this is a tuple, which can make the interpretation of the fields difficult. Therefore, create a struct so that the fields are assigned names. --- src/reactor/uring.rs | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/reactor/uring.rs b/src/reactor/uring.rs index ea66996..476a7ca 100644 --- a/src/reactor/uring.rs +++ b/src/reactor/uring.rs @@ -35,11 +35,11 @@ impl ReactorUring { ) }; - let (objs, results) = RefMut::map_split(borrow, |x| (&mut x.objs, &mut x.results)); + let (pending, results) = RefMut::map_split(borrow, |x| (&mut x.pending, &mut x.results)); IoCompletionIter { compl_queue, - objs, + pending, results, } } @@ -47,23 +47,31 @@ impl ReactorUring { struct ReactorInner { uring: IoUring, - objs: Slab<(T, usize)>, + pending: Slab>, results: RingResults, } +struct PendingIo { + assoc_obj: T, + result_slab_idx: usize, +} + impl ReactorInner { pub fn new() -> Self { Self { uring: IoUring::new(1024).unwrap(), - objs: Slab::new(), + pending: Slab::new(), results: RingResults::new(), } } pub fn submit_io(&mut self, entry: squeue::Entry, obj: T) -> usize { - let result_idx = self.results.create_slot(); + let result_slab_idx = self.results.create_slot(); - let slot = self.objs.insert((obj, result_idx)); + let slot = self.pending.insert(PendingIo { + assoc_obj: obj, + result_slab_idx, + }); unsafe { self.uring @@ -72,13 +80,13 @@ impl ReactorInner { .unwrap(); } - result_idx + result_slab_idx } } pub struct IoCompletionIter<'a, T> { compl_queue: CompletionQueue<'a>, - objs: RefMut<'a, Slab<(T, usize)>>, + pending: RefMut<'a, Slab>>, results: RefMut<'a, RingResults>, } @@ -88,10 +96,10 @@ impl Iterator for IoCompletionIter<'_, T> { fn next(&mut self) -> Option { let entry = self.compl_queue.next()?; - let (obj, result_idx) = self.objs.remove(entry.user_data() as usize); - self.results.set_result(entry.result(), result_idx); + let pending_io = self.pending.remove(entry.user_data() as usize); + self.results.set_result(entry.result(), pending_io.result_slab_idx); - Some(obj) + Some(pending_io.assoc_obj) } } From b55768632da3df4f225958982dcd0bcc6029ce04 Mon Sep 17 00:00:00 2001 From: Matthew Leach Date: Tue, 11 Feb 2025 20:32:54 +0000 Subject: [PATCH 2/5] reactor: uring: restrict access Remove access to functions that do not need to be accessible outside of the `reactor` module. --- src/reactor/uring.rs | 12 +++++++++--- src/reactor/uring/io.rs | 9 +++++++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/reactor/uring.rs b/src/reactor/uring.rs index 476a7ca..b424fd8 100644 --- a/src/reactor/uring.rs +++ b/src/reactor/uring.rs @@ -51,13 +51,19 @@ struct ReactorInner { results: RingResults, } +enum IoKind { + Oneshot, + Multi, +} + struct PendingIo { assoc_obj: T, result_slab_idx: usize, + kind: IoKind, } impl ReactorInner { - pub fn new() -> Self { + fn new() -> Self { Self { uring: IoUring::new(1024).unwrap(), pending: Slab::new(), @@ -65,7 +71,7 @@ impl ReactorInner { } } - pub fn submit_io(&mut self, entry: squeue::Entry, obj: T) -> usize { + fn submit_io(&mut self, entry: squeue::Entry, obj: T) -> usize { let result_slab_idx = self.results.create_slot(); let slot = self.pending.insert(PendingIo { @@ -158,7 +164,7 @@ mod tests { f(a, b, &mut uring); - assert_eq!(uring.inner.borrow().results.0.len(), 0); + assert!(uring.inner.borrow().results.is_empty()); } #[test] diff --git a/src/reactor/uring/io.rs b/src/reactor/uring/io.rs index 22d8df8..451d384 100644 --- a/src/reactor/uring/io.rs +++ b/src/reactor/uring/io.rs @@ -6,7 +6,7 @@ use slab::Slab; use super::ReactorInner; #[derive(Debug)] -pub(crate) enum IoState { +enum IoState { New, Submitted(usize), Finished(i32), @@ -71,7 +71,7 @@ impl<'a, T> Drop for UringIo<'a, T> { } } -pub struct RingResults(pub(super) Slab); +pub struct RingResults(Slab); pub(super) enum ResultState { Pending, @@ -84,6 +84,11 @@ impl RingResults { Self(Slab::new()) } + #[cfg(test)] + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + pub fn set_result(&mut self, result: i32, idx: usize) { let r_entry = self.0.get_mut(idx).unwrap(); From 3cacc89f635526cb412e1d1d873d8b0421541ab4 Mon Sep 17 00:00:00 2001 From: Matthew Leach Date: Fri, 21 Feb 2025 20:11:53 +0000 Subject: [PATCH 3/5] reactor: add multishot IO support Support multishot SQEs. --- Cargo.lock | 101 +++++++++++++++++++++++++ Cargo.toml | 2 + src/reactor/mod.rs | 4 + src/reactor/uring.rs | 50 ++++++++---- src/reactor/uring/io.rs | 77 +++++-------------- src/reactor/uring/result.rs | 146 ++++++++++++++++++++++++++++++++++++ src/task.rs | 5 +- 7 files changed, 307 insertions(+), 78 deletions(-) create mode 100644 src/reactor/uring/result.rs diff --git a/Cargo.lock b/Cargo.lock index d909419..a384bae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + [[package]] name = "aho-corasick" version = "1.1.3" @@ -87,6 +102,21 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +[[package]] +name = "backtrace" +version = "0.3.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets", +] + [[package]] name = "bitflags" version = "2.8.0" @@ -231,6 +261,12 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + [[package]] name = "getrandom" version = "0.3.1" @@ -243,6 +279,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + [[package]] name = "globset" version = "0.4.15" @@ -336,12 +378,36 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "miniz_oxide" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3b1c9bd4fe1f0f8b387f6eb9eb3b4a1aa26185e5750efb9140301703f62cd1b" +dependencies = [ + "adler2", +] + +[[package]] +name = "object" +version = "0.36.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + [[package]] name = "predicates" version = "3.1.3" @@ -416,6 +482,18 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "ringbuffer" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3df6368f71f205ff9c33c076d170dd56ebf68e8161c733c0caa07a7a5509ed53" + +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + [[package]] name = "rustix" version = "0.38.44" @@ -504,6 +582,27 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" +[[package]] +name = "tokio" +version = "1.43.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" +dependencies = [ + "backtrace", + "pin-project-lite", +] + +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "trale" version = "0.2.0" @@ -515,7 +614,9 @@ dependencies = [ "io-uring", "libc", "log", + "ringbuffer", "slab", + "tokio-stream", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 4bc3ab4..6c9c54a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,9 @@ keywords = ["io", "async", "non-blocking", "futures"] io-uring = "0.7.4" libc = "0.2.167" log = "0.4.22" +ringbuffer = "0.15.0" slab = "0.4.9" +tokio-stream = { version = "0.1.17", default-features = false } [dev-dependencies] anyhow = "1.0.81" diff --git a/src/reactor/mod.rs b/src/reactor/mod.rs index 03ef2ac..f6d5e07 100644 --- a/src/reactor/mod.rs +++ b/src/reactor/mod.rs @@ -16,6 +16,10 @@ impl Reactor { REACTOR.with(|r| unsafe { transmute(r.new_io()) }) } + pub fn new_multishot_io() -> ReactorIo { + REACTOR.with(|r| unsafe { transmute(r.new_multishot_io()) }) + } + pub fn react() { REACTOR.with(|r| { for waker in r.react() { diff --git a/src/reactor/uring.rs b/src/reactor/uring.rs index b424fd8..6888087 100644 --- a/src/reactor/uring.rs +++ b/src/reactor/uring.rs @@ -1,16 +1,18 @@ -use io::RingResults; pub(crate) use io::UringIo; use io_uring::{squeue, CompletionQueue, IoUring}; +use log::info; +use result::RingResults; use slab::Slab; use std::cell::{RefCell, RefMut}; mod io; +mod result; -pub struct ReactorUring { +pub struct ReactorUring { inner: RefCell>, } -impl ReactorUring { +impl ReactorUring { pub fn new() -> Self { Self { inner: RefCell::new(ReactorInner::new()), @@ -18,7 +20,11 @@ impl ReactorUring { } pub fn new_io(&self) -> UringIo<'_, T> { - UringIo::new(&self.inner) + UringIo::new(&self.inner, IoKind::Oneshot) + } + + pub fn new_multishot_io(&self) -> UringIo<'_, T> { + UringIo::new(&self.inner, IoKind::Multi) } pub fn react(&self) -> IoCompletionIter<'_, T> { @@ -35,12 +41,9 @@ impl ReactorUring { ) }; - let (pending, results) = RefMut::map_split(borrow, |x| (&mut x.pending, &mut x.results)); - IoCompletionIter { compl_queue, - pending, - results, + ring: borrow, } } } @@ -51,11 +54,13 @@ struct ReactorInner { results: RingResults, } +#[derive(Clone, Copy)] enum IoKind { Oneshot, Multi, } +#[derive(Clone)] struct PendingIo { assoc_obj: T, result_slab_idx: usize, @@ -71,12 +76,13 @@ impl ReactorInner { } } - fn submit_io(&mut self, entry: squeue::Entry, obj: T) -> usize { - let result_slab_idx = self.results.create_slot(); + fn submit_io(&mut self, entry: squeue::Entry, obj: T, kind: IoKind) -> usize { + let result_slab_idx = self.results.get(kind).create_slot(); let slot = self.pending.insert(PendingIo { assoc_obj: obj, result_slab_idx, + kind, }); unsafe { @@ -90,20 +96,32 @@ impl ReactorInner { } } -pub struct IoCompletionIter<'a, T> { +pub struct IoCompletionIter<'a, T: Clone> { compl_queue: CompletionQueue<'a>, - pending: RefMut<'a, Slab>>, - results: RefMut<'a, RingResults>, + ring: RefMut<'a, ReactorInner>, } -impl Iterator for IoCompletionIter<'_, T> { +impl Iterator for IoCompletionIter<'_, T> { type Item = T; fn next(&mut self) -> Option { let entry = self.compl_queue.next()?; - let pending_io = self.pending.remove(entry.user_data() as usize); - self.results.set_result(entry.result(), pending_io.result_slab_idx); + let pending_io = self + .ring + .pending + .get_mut(entry.user_data() as usize) + .unwrap() + .clone(); + + self.ring + .results + .get(pending_io.kind) + .set_result(entry.result(), pending_io.result_slab_idx); + + if let IoKind::Oneshot = pending_io.kind { + self.ring.pending.remove(entry.user_data() as usize); + } Some(pending_io.assoc_obj) } diff --git a/src/reactor/uring/io.rs b/src/reactor/uring/io.rs index 451d384..8af111b 100644 --- a/src/reactor/uring/io.rs +++ b/src/reactor/uring/io.rs @@ -1,9 +1,7 @@ use std::{cell::RefCell, task::Poll}; +use super::{IoKind, ReactorInner}; use io_uring::squeue; -use slab::Slab; - -use super::ReactorInner; #[derive(Debug)] enum IoState { @@ -14,6 +12,7 @@ enum IoState { pub(crate) struct UringIo<'a, T> { state: IoState, + kind: IoKind, ring: &'a RefCell>, } @@ -32,9 +31,10 @@ impl From<&IoState> for Poll> { } impl<'a, T> UringIo<'a, T> { - pub(super) fn new(ring: &'a RefCell>) -> Self { + pub(super) fn new(ring: &'a RefCell>, kind: IoKind) -> Self { Self { state: IoState::New, + kind, ring, } } @@ -46,14 +46,18 @@ impl<'a, T> UringIo<'a, T> { match self.state { IoState::New => { let (entry, obj) = f(); - let result_slot = self.ring.borrow_mut().submit_io(entry, obj); + let result_slot = self.ring.borrow_mut().submit_io(entry, obj, self.kind); self.state = IoState::Submitted(result_slot); } IoState::Submitted(slot) => { let mut ring = self.ring.borrow_mut(); - if let Some(res) = ring.results.get_result(slot) { - self.state = IoState::Finished(res); - ring.results.drop_result(slot); + let result_store = ring.results.get(self.kind); + + if let Some(res) = result_store.pop_result(slot) { + match self.kind { + IoKind::Oneshot => self.state = IoState::Finished(res), + IoKind::Multi => return (&IoState::Finished(res)).into(), + }; } } IoState::Finished(_) => {} @@ -66,58 +70,11 @@ impl<'a, T> UringIo<'a, T> { impl<'a, T> Drop for UringIo<'a, T> { fn drop(&mut self) { if let IoState::Submitted(slot) = self.state { - self.ring.borrow_mut().results.drop_result(slot); + self.ring + .borrow_mut() + .results + .get(self.kind) + .drop_result(slot); } } } - -pub struct RingResults(Slab); - -pub(super) enum ResultState { - Pending, - Set(i32), - Dropped, -} - -impl RingResults { - pub fn new() -> Self { - Self(Slab::new()) - } - - #[cfg(test)] - pub fn is_empty(&self) -> bool { - self.0.is_empty() - } - - pub fn set_result(&mut self, result: i32, idx: usize) { - let r_entry = self.0.get_mut(idx).unwrap(); - - if matches!(r_entry, ResultState::Dropped) { - self.0.remove(idx); - } else { - *r_entry = ResultState::Set(result); - } - } - - pub fn get_result(&self, idx: usize) -> Option { - match self.0.get(idx).unwrap() { - ResultState::Pending => None, - ResultState::Set(result) => Some(*result), - ResultState::Dropped => panic!("Should not be able to get a dropped result"), - } - } - - pub fn drop_result(&mut self, idx: usize) { - let r_entry = self.0.get_mut(idx).unwrap(); - - if matches!(r_entry, ResultState::Set(_)) { - self.0.remove(idx); - } else { - *r_entry = ResultState::Dropped; - } - } - - pub fn create_slot(&mut self) -> usize { - self.0.insert(ResultState::Pending) - } -} diff --git a/src/reactor/uring/result.rs b/src/reactor/uring/result.rs new file mode 100644 index 0000000..952666d --- /dev/null +++ b/src/reactor/uring/result.rs @@ -0,0 +1,146 @@ +use ringbuffer::{ConstGenericRingBuffer, RingBuffer}; +use slab::Slab; + +use super::IoKind; + +pub trait ResultStore { + fn set_result(&mut self, result: i32, idx: usize); + fn pop_result(&mut self, idx: usize) -> Option; + fn drop_result(&mut self, idx: usize); + fn create_slot(&mut self) -> usize; +} + +pub(super) enum ResultState { + Pending, + Set(i32), + Dropped, +} + +struct OneshotStore(Slab); + +impl OneshotStore { + pub fn new() -> Self { + Self(Slab::new()) + } + + #[cfg(test)] + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + +impl ResultStore for OneshotStore { + fn set_result(&mut self, result: i32, idx: usize) { + let r_entry = self.0.get_mut(idx).unwrap(); + + if matches!(r_entry, ResultState::Dropped) { + self.0.remove(idx); + } else { + *r_entry = ResultState::Set(result); + } + } + + fn pop_result(&mut self, idx: usize) -> Option { + let res = match self.0.get(idx).unwrap() { + ResultState::Pending => None, + ResultState::Set(result) => { + let ret = Some(*result); + self.0.remove(idx); + ret + } + ResultState::Dropped => panic!("Should not be able to get a dropped result"), + }; + + res + } + + fn drop_result(&mut self, idx: usize) { + let r_entry = self.0.get_mut(idx).unwrap(); + + if matches!(r_entry, ResultState::Set(_)) { + self.0.remove(idx); + } else { + *r_entry = ResultState::Dropped; + } + } + + fn create_slot(&mut self) -> usize { + self.0.insert(ResultState::Pending) + } +} + +enum MultishotResultState { + Active(ConstGenericRingBuffer), + Dropped, +} + +struct MultishotStore(Slab); + +impl MultishotStore { + fn new() -> Self { + Self(Slab::new()) + } + + #[cfg(test)] + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + +impl ResultStore for MultishotStore { + fn set_result(&mut self, result: i32, idx: usize) { + let r_entry = self.0.get_mut(idx).unwrap(); + + match r_entry { + MultishotResultState::Active(ref mut ring) => { + ring.push(result); + } + + // If the IO has been dropped, ignore any results. + MultishotResultState::Dropped => {} + } + } + + fn pop_result(&mut self, idx: usize) -> Option { + match self.0.get_mut(idx).unwrap() { + MultishotResultState::Active(ref mut ring) => ring.dequeue(), + MultishotResultState::Dropped => panic!("Shoult not be able to get a dropped result"), + } + } + + fn drop_result(&mut self, idx: usize) { + let r_entry = self.0.get_mut(idx).unwrap(); + *r_entry = MultishotResultState::Dropped; + } + + fn create_slot(&mut self) -> usize { + self.0 + .insert(MultishotResultState::Active(ConstGenericRingBuffer::new())) + } +} + +pub struct RingResults { + oneshot: OneshotStore, + multishot: MultishotStore, +} + +impl RingResults { + pub fn new() -> Self { + Self { + oneshot: OneshotStore::new(), + multishot: MultishotStore::new(), + } + } + + pub fn get(&mut self, kind: IoKind) -> &mut dyn ResultStore { + match kind { + IoKind::Oneshot => &mut self.oneshot, + IoKind::Multi => &mut self.multishot, + } + } + + #[cfg(test)] + pub fn is_empty(&self) -> bool { + self.oneshot.is_empty() && self.multishot.is_empty() + } +} diff --git a/src/task.rs b/src/task.rs index f732dc6..e501877 100644 --- a/src/task.rs +++ b/src/task.rs @@ -117,8 +117,9 @@ impl Wake for TaskId { fn wake(self: Arc) { EXEC.with(|exec| { let mut exec = exec.borrow_mut(); - let task = exec.waiting.remove(self.0.load(Ordering::Relaxed)); - exec.run_q.push(task); + if let Some(task) = exec.waiting.try_remove(self.0.load(Ordering::Relaxed)) { + exec.run_q.push(task); + } }); } } From e05823b58ae9300dfe696e47655e391537652109 Mon Sep 17 00:00:00 2001 From: Matthew Leach Date: Fri, 21 Feb 2025 20:12:31 +0000 Subject: [PATCH 4/5] futures: tcp: use multishot IO Use the AcceptMulti event, so that a single SQE can accept multiple connections. --- examples/tcp_serv.rs | 6 ++-- examples/web_server.rs | 8 +++-- src/futures/tcp.rs | 75 ++++++++++++++++++++---------------------- src/reactor/uring.rs | 1 - 4 files changed, 45 insertions(+), 45 deletions(-) diff --git a/examples/tcp_serv.rs b/examples/tcp_serv.rs index 904e959..5abcd2f 100644 --- a/examples/tcp_serv.rs +++ b/examples/tcp_serv.rs @@ -2,6 +2,7 @@ use std::time::Duration; use anyhow::{Context, Result}; use log::debug; +use tokio_stream::StreamExt; use trale::{ futures::{read::AsyncRead, tcp::TcpListener, timer::Timer, write::AsyncWrite}, task::{Executor, TaskJoiner}, @@ -24,13 +25,14 @@ fn main() -> Result<()> { let echo_task: TaskJoiner> = Executor::spawn(async { let mut buf = [0u8; 1]; let mut bytes_read: usize = 0; - let listener = TcpListener::bind("127.0.0.1:5000").context("Could not bind")?; + let mut listener = TcpListener::bind("127.0.0.1:5000").context("Could not bind")?; println!("Waiting for connection on 127.0.0.1:5000"); let mut conn = listener - .accept() + .next() .await + .unwrap() .context("Could not accept incoming connection")?; // We only want to accept a single connection. Drop the lisetner once diff --git a/examples/web_server.rs b/examples/web_server.rs index c68a7c7..7bfe1be 100644 --- a/examples/web_server.rs +++ b/examples/web_server.rs @@ -8,6 +8,7 @@ use std::{ path::PathBuf, sync::OnceLock, }; +use tokio_stream::StreamExt; use trale::{ futures::{ fs::File, @@ -151,12 +152,12 @@ fn main() -> anyhow::Result<()> { ARGS.set(args).expect("Should have never been set"); - let listener = TcpListener::bind((Ipv4Addr::UNSPECIFIED, ARGS.get().unwrap().port)) + let mut listener = TcpListener::bind((Ipv4Addr::UNSPECIFIED, ARGS.get().unwrap().port)) .context("Could not setup socket listener")?; Executor::block_on(async move { - loop { - match listener.accept().await { + while let Some(conn) = listener.next().await { + match conn { Ok(conn) => { Executor::spawn(async { if let Err(e) = handle_connection(conn).await { @@ -167,6 +168,7 @@ fn main() -> anyhow::Result<()> { Err(e) => error!("Could not accept incoming connection: {e:?}"), } } + eprintln!("Bye!"); }); Ok(()) diff --git a/src/futures/tcp.rs b/src/futures/tcp.rs index 58fa5cb..aea81b0 100644 --- a/src/futures/tcp.rs +++ b/src/futures/tcp.rs @@ -13,16 +13,18 @@ //! use trale::futures::tcp::TcpListener; //! use trale::futures::read::AsyncRead; //! use trale::futures::write::AsyncWrite; +//! use tokio_stream::StreamExt; //! async { -//! let listener = TcpListener::bind("0.0.0.0:8888")?; -//! let mut sock = listener.accept().await?; +//! let mut listener = TcpListener::bind("0.0.0.0:8888")?; +//! while let Some(Ok(mut sock)) = listener.next().await { //! let mut buf = [0u8; 1]; -//! loop { -//! let len = sock.read(&mut buf).await?; -//! if len == 0 { -//! return Ok(()); +//! loop { +//! let len = sock.read(&mut buf).await?; +//! if len == 0 { +//! return Ok(()); +//! } +//! sock.write(&buf).await?; //! } -//! sock.write(&buf).await?; //! } //!# Ok::<(), std::io::Error>(()) //! }; @@ -33,12 +35,12 @@ use std::{ net::{SocketAddr, ToSocketAddrs}, os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, OwnedFd}, pin::Pin, - ptr::null_mut, task::{Context, Poll}, }; use io_uring::{opcode, types}; use libc::{AF_INET, AF_INET6, SOCK_STREAM}; +use tokio_stream::Stream; use crate::reactor::{Reactor, ReactorIo}; @@ -54,14 +56,6 @@ use super::{ /// accept connections on the specified address. pub struct TcpListener { inner: OwnedFd, -} - -/// A future for accepting new connections. -/// -/// Call `.await` to pause the current task until a new connection has been -/// established. -pub struct Acceptor<'fd> { - inner: BorrowedFd<'fd>, io: ReactorIo, } @@ -85,8 +79,11 @@ impl TcpListener { /// This function will create a new socket, bind it to one of the specified /// `addrs` and returna [TcpListener]. If binding to *all* of the specified /// addresses fails then the reason for failing to bind to the *last* - /// address is returned. Otherwise, use [TcpListener::accept] to obtain a - /// future to accept new connections. + /// address is returned. Otherwise, `.await` on the listener to await a new + /// connection. + /// + /// When this future returns None, the listener will no long accept any + /// further connections and should be dropped. pub fn bind(addrs: impl ToSocketAddrs) -> std::io::Result { let addrs = addrs.to_socket_addrs()?; let mut last_err = ErrorKind::NotFound.into(); @@ -102,38 +99,38 @@ impl TcpListener { match unsafe { libc::listen(sock.as_raw_fd(), 1024) } { -1 => last_err = std::io::Error::last_os_error(), - 0 => return Ok(Self { inner: sock }), + 0 => { + return Ok(Self { + inner: sock, + io: Reactor::new_multishot_io(), + }) + } _ => unreachable!("listen() cannot return a value other than 0 or -1"), } } Err(last_err) } - - /// Return a future for accepting a new connection. - /// - /// You can `.await` the returned future to wait for a new incoming - /// connection. Once a connection has been successfully established, a new - /// [TcpStream] is returned which is connected to the peer. - pub fn accept(&self) -> Acceptor { - Acceptor { - inner: self.inner.as_fd(), - io: Reactor::new_io(), - } - } } -impl Future for Acceptor<'_> { - type Output = std::io::Result; +impl Stream for TcpListener { + type Item = std::io::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let entry = opcode::Accept::new(types::Fd(self.inner.as_raw_fd()), null_mut(), null_mut()); + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); - self.io - .submit_or_get_result(|| (entry.build(), cx.waker().clone())) + this.io + .submit_or_get_result(|| { + ( + opcode::AcceptMulti::new(types::Fd(this.inner.as_raw_fd())).build(), + cx.waker().clone(), + ) + }) .map(|x| { - x.map(|fd| TcpStream { - inner: unsafe { OwnedFd::from_raw_fd(fd) }, + Some({ + x.map(|fd| TcpStream { + inner: unsafe { OwnedFd::from_raw_fd(fd) }, + }) }) }) } diff --git a/src/reactor/uring.rs b/src/reactor/uring.rs index 6888087..21af6a1 100644 --- a/src/reactor/uring.rs +++ b/src/reactor/uring.rs @@ -1,6 +1,5 @@ pub(crate) use io::UringIo; use io_uring::{squeue, CompletionQueue, IoUring}; -use log::info; use result::RingResults; use slab::Slab; use std::cell::{RefCell, RefMut}; From b030fe807c58db600f02cb592a31412f59c406a2 Mon Sep 17 00:00:00 2001 From: Matthew Leach Date: Thu, 1 May 2025 07:36:29 +0100 Subject: [PATCH 5/5] multishot: implement detection of `finished` When a multishot IO will no longer yield a value the IORING_CQE_F_MORE flag will not be set. Detect that here and make all multishot IOs return `Poll::Ready(None)` in that case to signal that the end of the stream has been reached. --- examples/udp.rs | 4 +- examples/web_server.rs | 8 +-- src/futures/tcp.rs | 6 +- src/reactor/mod.rs | 14 +++-- src/reactor/uring.rs | 68 ++++++++++++--------- src/reactor/uring/io.rs | 80 ------------------------- src/reactor/uring/io/mod.rs | 10 ++++ src/reactor/uring/io/multishot.rs | 67 +++++++++++++++++++++ src/reactor/uring/io/oneshot.rs | 75 +++++++++++++++++++++++ src/reactor/uring/result.rs | 98 +++++++++++++++++-------------- 10 files changed, 262 insertions(+), 168 deletions(-) delete mode 100644 src/reactor/uring/io.rs create mode 100644 src/reactor/uring/io/mod.rs create mode 100644 src/reactor/uring/io/multishot.rs create mode 100644 src/reactor/uring/io/oneshot.rs diff --git a/examples/udp.rs b/examples/udp.rs index f10175f..54e4199 100644 --- a/examples/udp.rs +++ b/examples/udp.rs @@ -24,11 +24,11 @@ fn main() { }); Executor::spawn(async { - let mut buf = [0xadu8; 20]; + let buf = [0xadu8; 20]; let udpsock = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).unwrap(); Timer::sleep(Duration::from_secs(1)).unwrap().await; let len = udpsock - .send_to(&mut buf, (Ipv4Addr::LOCALHOST, 9998)) + .send_to(&buf, (Ipv4Addr::LOCALHOST, 9998)) .await .unwrap(); diff --git a/examples/web_server.rs b/examples/web_server.rs index 7bfe1be..03ae513 100644 --- a/examples/web_server.rs +++ b/examples/web_server.rs @@ -134,12 +134,10 @@ async fn handle_connection(mut conn: TcpStream) -> anyhow::Result<()> { let file = if path == PathBuf::from("/") { ARGS.get().unwrap().webroot.join("index.html") + } else if let Ok(path) = path.strip_prefix("/") { + ARGS.get().unwrap().webroot.join(path) } else { - if let Ok(path) = path.strip_prefix("/") { - ARGS.get().unwrap().webroot.join(path) - } else { - return send_response_hdr(&mut conn, Response::NotFound, 0).await; - } + return send_response_hdr(&mut conn, Response::NotFound, 0).await; }; send_file(conn, file).await diff --git a/src/futures/tcp.rs b/src/futures/tcp.rs index aea81b0..0732141 100644 --- a/src/futures/tcp.rs +++ b/src/futures/tcp.rs @@ -42,7 +42,7 @@ use io_uring::{opcode, types}; use libc::{AF_INET, AF_INET6, SOCK_STREAM}; use tokio_stream::Stream; -use crate::reactor::{Reactor, ReactorIo}; +use crate::reactor::{MultishotReactorIo, Reactor, ReactorIo}; use super::{ read::{AsyncRead, AsyncReader}, @@ -56,7 +56,7 @@ use super::{ /// accept connections on the specified address. pub struct TcpListener { inner: OwnedFd, - io: ReactorIo, + io: MultishotReactorIo, } fn mk_sock(addr: &SocketAddr) -> std::io::Result { @@ -127,7 +127,7 @@ impl Stream for TcpListener { ) }) .map(|x| { - Some({ + x.map(|x| { x.map(|fd| TcpStream { inner: unsafe { OwnedFd::from_raw_fd(fd) }, }) diff --git a/src/reactor/mod.rs b/src/reactor/mod.rs index f6d5e07..63283c1 100644 --- a/src/reactor/mod.rs +++ b/src/reactor/mod.rs @@ -1,9 +1,11 @@ -use std::{mem::transmute, task::Waker}; -use uring::{ReactorUring, UringIo}; +use std::task::Waker; + +use uring::{MultishotUringIo, OneshotUringIo, ReactorUring}; mod uring; -pub type ReactorIo = UringIo<'static, Waker>; +pub type ReactorIo = OneshotUringIo; +pub type MultishotReactorIo = MultishotUringIo; pub(crate) struct Reactor {} @@ -13,11 +15,11 @@ thread_local! { impl Reactor { pub fn new_io() -> ReactorIo { - REACTOR.with(|r| unsafe { transmute(r.new_io()) }) + REACTOR.with(|r| r.new_oneshot_io()) } - pub fn new_multishot_io() -> ReactorIo { - REACTOR.with(|r| unsafe { transmute(r.new_multishot_io()) }) + pub fn new_multishot_io() -> MultishotReactorIo { + REACTOR.with(|r| r.new_multishot_io()) } pub fn react() { diff --git a/src/reactor/uring.rs b/src/reactor/uring.rs index 21af6a1..d23f7ea 100644 --- a/src/reactor/uring.rs +++ b/src/reactor/uring.rs @@ -1,29 +1,32 @@ -pub(crate) use io::UringIo; -use io_uring::{squeue, CompletionQueue, IoUring}; +pub(crate) use io::{multishot::MultishotUringIo, oneshot::OneshotUringIo}; +use io_uring::{cqueue, squeue, CompletionQueue, IoUring}; use result::RingResults; use slab::Slab; -use std::cell::{RefCell, RefMut}; +use std::{ + cell::{RefCell, RefMut}, + rc::Rc, +}; mod io; mod result; pub struct ReactorUring { - inner: RefCell>, + inner: Rc>>, } impl ReactorUring { pub fn new() -> Self { Self { - inner: RefCell::new(ReactorInner::new()), + inner: Rc::new(RefCell::new(ReactorInner::new())), } } - pub fn new_io(&self) -> UringIo<'_, T> { - UringIo::new(&self.inner, IoKind::Oneshot) + pub fn new_oneshot_io(&self) -> OneshotUringIo { + OneshotUringIo::new(self.inner.clone()) } - pub fn new_multishot_io(&self) -> UringIo<'_, T> { - UringIo::new(&self.inner, IoKind::Multi) + pub fn new_multishot_io(&self) -> MultishotUringIo { + MultishotUringIo::new(self.inner.clone()) } pub fn react(&self) -> IoCompletionIter<'_, T> { @@ -47,7 +50,7 @@ impl ReactorUring { } } -struct ReactorInner { +pub(crate) struct ReactorInner { uring: IoUring, pending: Slab>, results: RingResults, @@ -75,8 +78,11 @@ impl ReactorInner { } } - fn submit_io(&mut self, entry: squeue::Entry, obj: T, kind: IoKind) -> usize { - let result_slab_idx = self.results.get(kind).create_slot(); + fn submit_io(&mut self, entry: squeue::Entry, obj: T, kind: IoKind) -> (u64, usize) { + let result_slab_idx = match kind { + IoKind::Oneshot => self.results.get_oneshot().create_slot(), + IoKind::Multi => self.results.get_multishot().create_slot(), + }; let slot = self.pending.insert(PendingIo { assoc_obj: obj, @@ -91,7 +97,7 @@ impl ReactorInner { .unwrap(); } - result_slab_idx + (slot as u64, result_slab_idx) } } @@ -113,13 +119,21 @@ impl Iterator for IoCompletionIter<'_, T> { .unwrap() .clone(); - self.ring - .results - .get(pending_io.kind) - .set_result(entry.result(), pending_io.result_slab_idx); - - if let IoKind::Oneshot = pending_io.kind { - self.ring.pending.remove(entry.user_data() as usize); + match pending_io.kind { + IoKind::Oneshot => { + self.ring + .results + .get_oneshot() + .set_result(entry.result(), pending_io.result_slab_idx); + self.ring.pending.remove(entry.user_data() as usize); + } + IoKind::Multi => { + let results = self.ring.results.get_multishot(); + results.push_result(entry.result(), pending_io.result_slab_idx); + if !cqueue::more(entry.flags()) { + results.set_finished(pending_io.result_slab_idx); + } + } } Some(pending_io.assoc_obj) @@ -189,7 +203,7 @@ mod tests { run_test(|a, b, uring| { let mut buf = [0]; - let mut io = uring.new_io(); + let mut io = uring.new_oneshot_io(); let result = io.submit_or_get_result(|| { ( opcode::Read::new(types::Fd(a.as_raw_fd()), buf.as_mut_ptr(), 1).build(), @@ -224,7 +238,7 @@ mod tests { run_test(|a, b, uring| { let mut buf = [0]; - let mut io = uring.new_io(); + let mut io = uring.new_oneshot_io(); assert!(matches!( io.submit_or_get_result(|| { ( @@ -255,7 +269,7 @@ mod tests { run_test(|a, b, uring| { let buf = [0]; - let mut io = uring.new_io(); + let mut io = uring.new_oneshot_io(); let result = io.submit_or_get_result(|| { ( opcode::Write::new(types::Fd(a.as_raw_fd()), buf.as_ptr(), buf.len() as _) @@ -293,7 +307,7 @@ mod tests { run_test(|a, b, uring| { let mut buf = [0, 0]; - let mut io1 = uring.new_io(); + let mut io1 = uring.new_oneshot_io(); assert!(matches!( io1.submit_or_get_result(|| { ( @@ -304,7 +318,7 @@ mod tests { Poll::Pending )); - let mut io2 = uring.new_io(); + let mut io2 = uring.new_oneshot_io(); assert!(matches!( io2.submit_or_get_result(|| { ( @@ -344,7 +358,7 @@ mod tests { run_test(|a, b, uring| { let buf = [0xbe, 0xef]; - let mut io1 = uring.new_io(); + let mut io1 = uring.new_oneshot_io(); assert!(matches!( io1.submit_or_get_result(|| { ( @@ -355,7 +369,7 @@ mod tests { Poll::Pending )); - let mut io2 = uring.new_io(); + let mut io2 = uring.new_oneshot_io(); assert!(matches!( io2.submit_or_get_result(|| { ( diff --git a/src/reactor/uring/io.rs b/src/reactor/uring/io.rs deleted file mode 100644 index 8af111b..0000000 --- a/src/reactor/uring/io.rs +++ /dev/null @@ -1,80 +0,0 @@ -use std::{cell::RefCell, task::Poll}; - -use super::{IoKind, ReactorInner}; -use io_uring::squeue; - -#[derive(Debug)] -enum IoState { - New, - Submitted(usize), - Finished(i32), -} - -pub(crate) struct UringIo<'a, T> { - state: IoState, - kind: IoKind, - ring: &'a RefCell>, -} - -impl From<&IoState> for Poll> { - fn from(value: &IoState) -> Self { - match value { - IoState::New => Poll::Pending, - IoState::Submitted(_) => Poll::Pending, - IoState::Finished(result) => Poll::Ready(if *result < 0 { - Err(std::io::Error::from_raw_os_error(result.abs())) - } else { - Ok(*result) - }), - } - } -} - -impl<'a, T> UringIo<'a, T> { - pub(super) fn new(ring: &'a RefCell>, kind: IoKind) -> Self { - Self { - state: IoState::New, - kind, - ring, - } - } - - pub fn submit_or_get_result( - &mut self, - f: impl FnOnce() -> (squeue::Entry, T), - ) -> Poll> { - match self.state { - IoState::New => { - let (entry, obj) = f(); - let result_slot = self.ring.borrow_mut().submit_io(entry, obj, self.kind); - self.state = IoState::Submitted(result_slot); - } - IoState::Submitted(slot) => { - let mut ring = self.ring.borrow_mut(); - let result_store = ring.results.get(self.kind); - - if let Some(res) = result_store.pop_result(slot) { - match self.kind { - IoKind::Oneshot => self.state = IoState::Finished(res), - IoKind::Multi => return (&IoState::Finished(res)).into(), - }; - } - } - IoState::Finished(_) => {} - } - - (&self.state).into() - } -} - -impl<'a, T> Drop for UringIo<'a, T> { - fn drop(&mut self) { - if let IoState::Submitted(slot) = self.state { - self.ring - .borrow_mut() - .results - .get(self.kind) - .drop_result(slot); - } - } -} diff --git a/src/reactor/uring/io/mod.rs b/src/reactor/uring/io/mod.rs new file mode 100644 index 0000000..43ffd9a --- /dev/null +++ b/src/reactor/uring/io/mod.rs @@ -0,0 +1,10 @@ +pub mod multishot; +pub mod oneshot; + +fn reactor_value_to_result(v: i32) -> std::io::Result { + if v < 0 { + Err(std::io::Error::from_raw_os_error(v.abs())) + } else { + Ok(v) + } +} diff --git a/src/reactor/uring/io/multishot.rs b/src/reactor/uring/io/multishot.rs new file mode 100644 index 0000000..3265d8e --- /dev/null +++ b/src/reactor/uring/io/multishot.rs @@ -0,0 +1,67 @@ +use std::{cell::RefCell, rc::Rc, task::Poll}; + +use io_uring::{squeue, types::CancelBuilder}; + +use crate::reactor::uring::{result::MultishotResult, IoKind, ReactorInner}; + +use super::reactor_value_to_result; + +#[derive(Debug)] +enum IoState { + New, + Submitted(usize, u64), +} + +pub(crate) struct MultishotUringIo { + state: IoState, + ring: Rc>>, +} + +impl MultishotUringIo { + pub(crate) fn new(ring: Rc>>) -> Self { + Self { + state: IoState::New, + ring, + } + } + + pub fn submit_or_get_result( + &mut self, + f: impl FnOnce() -> (squeue::Entry, T), + ) -> Poll>> { + match self.state { + IoState::New => { + let (entry, obj) = f(); + let (user_data, result_slot) = + self.ring.borrow_mut().submit_io(entry, obj, IoKind::Multi); + self.state = IoState::Submitted(result_slot, user_data); + Poll::Pending + } + IoState::Submitted(slot, _) => { + let mut ring = self.ring.borrow_mut(); + let result_store = ring.results.get_multishot(); + + match result_store.pop_result(slot) { + MultishotResult::Value(v) => Poll::Ready(Some(reactor_value_to_result(v))), + MultishotResult::Pending => Poll::Pending, + MultishotResult::Finished => Poll::Ready(None), + } + } + } + } +} + +impl Drop for MultishotUringIo { + fn drop(&mut self) { + if let IoState::Submitted(slot, user_data) = self.state { + let mut ring = self.ring.borrow_mut(); + + ring.uring + .submitter() + .register_sync_cancel(None, CancelBuilder::user_data(user_data)) + .expect("Should be able to cancel in-flight multishot IO"); + + ring.results.get_multishot().drop_result(slot); + } + } +} diff --git a/src/reactor/uring/io/oneshot.rs b/src/reactor/uring/io/oneshot.rs new file mode 100644 index 0000000..99d3282 --- /dev/null +++ b/src/reactor/uring/io/oneshot.rs @@ -0,0 +1,75 @@ +use std::{cell::RefCell, rc::Rc, task::Poll}; + +use io_uring::squeue; + +use crate::reactor::uring::{IoKind, ReactorInner}; + +use super::reactor_value_to_result; + +#[derive(Debug)] +enum IoState { + New, + Submitted(usize), + Finished(i32), +} + +pub(crate) struct OneshotUringIo { + state: IoState, + ring: Rc>>, +} + +impl From<&IoState> for Poll> { + fn from(value: &IoState) -> Self { + match value { + IoState::New => Poll::Pending, + IoState::Submitted(_) => Poll::Pending, + IoState::Finished(result) => Poll::Ready(reactor_value_to_result(*result)), + } + } +} + +impl OneshotUringIo { + pub fn new(ring: Rc>>) -> Self { + Self { + state: IoState::New, + ring, + } + } + + pub fn submit_or_get_result( + &mut self, + f: impl FnOnce() -> (squeue::Entry, T), + ) -> Poll> { + match self.state { + IoState::New => { + let (entry, obj) = f(); + let (_, result_slot) = + self.ring + .borrow_mut() + .submit_io(entry, obj, IoKind::Oneshot); + self.state = IoState::Submitted(result_slot); + } + IoState::Submitted(slot) => { + let mut ring = self.ring.borrow_mut(); + let result_store = ring.results.get_oneshot(); + + if let Some(res) = result_store.get_result(slot) { + self.state = IoState::Finished(res); + } + } + IoState::Finished(_) => {} + } + + (&self.state).into() + } +} + +impl Drop for OneshotUringIo { + fn drop(&mut self) { + if let IoState::Submitted(slot) = self.state { + let mut ring = self.ring.borrow_mut(); + + ring.results.get_oneshot().drop_result(slot); + } + } +} diff --git a/src/reactor/uring/result.rs b/src/reactor/uring/result.rs index 952666d..5b521e1 100644 --- a/src/reactor/uring/result.rs +++ b/src/reactor/uring/result.rs @@ -1,22 +1,13 @@ use ringbuffer::{ConstGenericRingBuffer, RingBuffer}; use slab::Slab; -use super::IoKind; - -pub trait ResultStore { - fn set_result(&mut self, result: i32, idx: usize); - fn pop_result(&mut self, idx: usize) -> Option; - fn drop_result(&mut self, idx: usize); - fn create_slot(&mut self) -> usize; -} - pub(super) enum ResultState { Pending, Set(i32), Dropped, } -struct OneshotStore(Slab); +pub(crate) struct OneshotStore(Slab); impl OneshotStore { pub fn new() -> Self { @@ -27,10 +18,8 @@ impl OneshotStore { pub fn is_empty(&self) -> bool { self.0.is_empty() } -} -impl ResultStore for OneshotStore { - fn set_result(&mut self, result: i32, idx: usize) { + pub fn set_result(&mut self, result: i32, idx: usize) { let r_entry = self.0.get_mut(idx).unwrap(); if matches!(r_entry, ResultState::Dropped) { @@ -40,7 +29,7 @@ impl ResultStore for OneshotStore { } } - fn pop_result(&mut self, idx: usize) -> Option { + pub fn get_result(&mut self, idx: usize) -> Option { let res = match self.0.get(idx).unwrap() { ResultState::Pending => None, ResultState::Set(result) => { @@ -54,7 +43,7 @@ impl ResultStore for OneshotStore { res } - fn drop_result(&mut self, idx: usize) { + pub fn drop_result(&mut self, idx: usize) { let r_entry = self.0.get_mut(idx).unwrap(); if matches!(r_entry, ResultState::Set(_)) { @@ -64,17 +53,24 @@ impl ResultStore for OneshotStore { } } - fn create_slot(&mut self) -> usize { + pub fn create_slot(&mut self) -> usize { self.0.insert(ResultState::Pending) } } -enum MultishotResultState { - Active(ConstGenericRingBuffer), - Dropped, +struct MultishotResultState { + results: ConstGenericRingBuffer, + dropped: bool, + finished: bool, } -struct MultishotStore(Slab); +pub enum MultishotResult { + Value(i32), + Pending, + Finished, +} + +pub(crate) struct MultishotStore(Slab); impl MultishotStore { fn new() -> Self { @@ -85,37 +81,48 @@ impl MultishotStore { pub fn is_empty(&self) -> bool { self.0.is_empty() } -} -impl ResultStore for MultishotStore { - fn set_result(&mut self, result: i32, idx: usize) { - let r_entry = self.0.get_mut(idx).unwrap(); + pub fn push_result(&mut self, result: i32, idx: usize) { + self.0.get_mut(idx).unwrap().results.push(result); + } - match r_entry { - MultishotResultState::Active(ref mut ring) => { - ring.push(result); + pub fn pop_result(&mut self, idx: usize) -> MultishotResult { + let result = self.0.get_mut(idx).unwrap(); + + match result.results.dequeue() { + Some(v) => MultishotResult::Value(v), + None => { + if result.finished { + MultishotResult::Finished + } else { + MultishotResult::Pending + } } - - // If the IO has been dropped, ignore any results. - MultishotResultState::Dropped => {} } } - fn pop_result(&mut self, idx: usize) -> Option { - match self.0.get_mut(idx).unwrap() { - MultishotResultState::Active(ref mut ring) => ring.dequeue(), - MultishotResultState::Dropped => panic!("Shoult not be able to get a dropped result"), + pub fn drop_result(&mut self, idx: usize) { + if self.0.get_mut(idx).unwrap().finished { + self.0.remove(idx); + } else { + self.0.get_mut(idx).unwrap().dropped = true; } } - fn drop_result(&mut self, idx: usize) { - let r_entry = self.0.get_mut(idx).unwrap(); - *r_entry = MultishotResultState::Dropped; + pub fn create_slot(&mut self) -> usize { + self.0.insert(MultishotResultState { + results: ConstGenericRingBuffer::new(), + dropped: false, + finished: false, + }) } - fn create_slot(&mut self) -> usize { - self.0 - .insert(MultishotResultState::Active(ConstGenericRingBuffer::new())) + pub fn set_finished(&mut self, idx: usize) { + if self.0.get(idx).unwrap().dropped { + self.0.remove(idx); + } else { + self.0.get_mut(idx).unwrap().finished = true; + } } } @@ -132,11 +139,12 @@ impl RingResults { } } - pub fn get(&mut self, kind: IoKind) -> &mut dyn ResultStore { - match kind { - IoKind::Oneshot => &mut self.oneshot, - IoKind::Multi => &mut self.multishot, - } + pub fn get_oneshot(&mut self) -> &mut OneshotStore { + &mut self.oneshot + } + + pub fn get_multishot(&mut self) -> &mut MultishotStore { + &mut self.multishot } #[cfg(test)]