Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rio"
version = "0.9.2"
version = "0.9.3"
authors = ["Tyler Neely <t@jujit.su>"]
edition = "2018"
description = "GPL-3.0 nice bindings for io_uring. MIT/Apache-2.0 license is available for spacejam's github sponsors."
Expand Down
11 changes: 9 additions & 2 deletions src/completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ impl Default for CompletionState {
}

/// A Future value which may or may not be filled
///
/// # Safety
///
/// Never call `std::mem::forget` on this value.
/// It can lead to a use-after-free bug. The fact
/// that `std::mem::forget` is not marked unsafe
/// is a bug in the Rust standard library.
Comment on lines +33 to +38
Copy link

@bb010g bb010g Oct 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't std::mem::forget actually being safe, against pre-1.0 thoughts/desires, the whole issue with the Leakpocalypse?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nevermind, this documentation was covered & changed by #30 & #31.

#[derive(Debug)]
pub struct Completion<'a, C: FromCqe> {
lifetime: PhantomData<&'a C>,
Expand Down Expand Up @@ -97,9 +104,9 @@ impl<'a, C: FromCqe> Completion<'a, C> {
inner = self.cv.wait(inner).unwrap();
}

return inner.item.take().map(|io_result| {
inner.item.take().map(|io_result| {
io_result.map(FromCqe::from_cqe)
});
})
}
}

Expand Down
46 changes: 27 additions & 19 deletions src/io_uring/cq.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
#![allow(unsafe_code)]

use std::slice::from_raw_parts_mut;

use super::*;

/// Consumes uring completions.
#[derive(Debug)]
pub struct Cq {
khead: &'static AtomicU32,
ktail: &'static AtomicU32,
kring_mask: &'static u32,
koverflow: &'static AtomicU32,
cqes: &'static mut [io_uring_cqe],
khead: *mut AtomicU32,
ktail: *mut AtomicU32,
kring_mask: *mut u32,
koverflow: *mut AtomicU32,
cqes: *mut [io_uring_cqe],
ticket_queue: Arc<TicketQueue>,
in_flight: Arc<InFlight>,
ring_ptr: *const libc::c_void,
Expand Down Expand Up @@ -54,18 +56,18 @@ impl Cq {
Cq {
ring_ptr: cq_ring_ptr,
ring_mmap_sz: cq_ring_mmap_sz,
khead: &*(cq_ring_ptr
khead: cq_ring_ptr
.add(params.cq_off.head as usize)
as *const AtomicU32),
ktail: &*(cq_ring_ptr
as *mut AtomicU32,
ktail: cq_ring_ptr
.add(params.cq_off.tail as usize)
as *const AtomicU32),
kring_mask: &*(cq_ring_ptr
as *mut AtomicU32,
kring_mask: cq_ring_ptr
.add(params.cq_off.ring_mask as usize)
as *const u32),
koverflow: &*(cq_ring_ptr
as *mut u32,
koverflow: cq_ring_ptr
.add(params.cq_off.overflow as usize)
as *const AtomicU32),
as *mut AtomicU32,
cqes: from_raw_parts_mut(
cq_ring_ptr
.add(params.cq_off.cqes as usize)
Expand Down Expand Up @@ -95,7 +97,12 @@ impl Cq {
if let Err(e) = block_for_cqe(ring_fd) {
panic!("error in cqe reaper: {:?}", e);
} else {
assert_eq!(self.koverflow.load(Relaxed), 0);
assert_eq!(
unsafe {
(*self.koverflow).load(Relaxed)
},
0
);
if self.reap_ready_cqes().is_none() {
// poison pill detected, time to shut down
return;
Expand All @@ -106,8 +113,9 @@ impl Cq {

fn reap_ready_cqes(&mut self) -> Option<usize> {
let _ = Measure::new(&M.reap_ready);
let mut head = self.khead.load(Acquire);
let tail = self.ktail.load(Acquire);
let mut head =
unsafe { &*self.khead }.load(Acquire);
let tail = unsafe { &*self.ktail }.load(Acquire);
let count = tail - head;

// hack to get around mutable usage in loop
Expand All @@ -119,8 +127,8 @@ impl Cq {

while head != tail {
let cq = cq_opt.take().unwrap();
let index = head & cq.kring_mask;
let cqe = &cq.cqes[index as usize];
let index = head & unsafe { *cq.kring_mask };
let cqe = &unsafe { &*cq.cqes }[index as usize];

// we detect a poison pill by seeing if
// the user_data is really big, which it
Expand Down Expand Up @@ -148,7 +156,7 @@ impl Cq {

completion_filler.fill(result);

cq.khead.fetch_add(1, Release);
unsafe { &*cq.khead }.fetch_add(1, Release);
cq_opt = Some(cq);
head += 1;

Expand Down
62 changes: 35 additions & 27 deletions src/io_uring/sq.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
#![allow(unsafe_code)]

use std::slice::from_raw_parts_mut;

use super::*;

/// Sprays uring submissions.
#[derive(Debug)]
pub(crate) struct Sq {
khead: &'static AtomicU32,
ktail: &'static AtomicU32,
kring_mask: &'static u32,
kflags: &'static AtomicU32,
kdropped: &'static AtomicU32,
khead: *mut AtomicU32,
ktail: *mut AtomicU32,
kring_mask: *mut u32,
kflags: *mut AtomicU32,
kdropped: *mut AtomicU32,
array: &'static mut [AtomicU32],
sqes: &'static mut [io_uring_sqe],
sqe_head: u32,
Expand Down Expand Up @@ -64,39 +66,38 @@ impl Sq {
IORING_OFF_SQES,
)? as _;

#[allow(unsafe_code)]
Ok(unsafe {
Sq {
sqe_head: 0,
sqe_tail: 0,
ring_ptr: sq_ring_ptr,
ring_mmap_sz: sq_ring_mmap_sz,
sqes_mmap_sz,
sqes: from_raw_parts_mut(
sqes_ptr,
params.sq_entries as usize,
),
khead: &*(sq_ring_ptr
khead: sq_ring_ptr
.add(params.sq_off.head as usize)
as *const AtomicU32),
ktail: &*(sq_ring_ptr
as *mut AtomicU32,
ktail: sq_ring_ptr
.add(params.sq_off.tail as usize)
as *const AtomicU32),
kring_mask: &*(sq_ring_ptr
as *mut AtomicU32,
kring_mask: sq_ring_ptr
.add(params.sq_off.ring_mask as usize)
as *const u32),
kflags: &*(sq_ring_ptr
as *mut u32,
kflags: sq_ring_ptr
.add(params.sq_off.flags as usize)
as *const AtomicU32),
kdropped: &*(sq_ring_ptr
as *mut AtomicU32,
kdropped: sq_ring_ptr
.add(params.sq_off.dropped as usize)
as *const AtomicU32),
as *mut AtomicU32,
array: from_raw_parts_mut(
sq_ring_ptr
.add(params.sq_off.array as usize)
as _,
params.sq_entries as usize,
),
sqes: from_raw_parts_mut(
sqes_ptr,
params.sq_entries as usize,
),
}
})
}
Expand All @@ -113,11 +114,12 @@ impl Sq {
self.sqe_head
} else {
// polling mode
self.khead.load(Acquire)
unsafe { &*self.khead }.load(Acquire)
};

if next - head <= self.sqes.len() as u32 {
let idx = self.sqe_tail & self.kring_mask;
let idx =
self.sqe_tail & unsafe { *self.kring_mask };
let sqe = &mut self.sqes[idx as usize];
self.sqe_tail = next;

Expand All @@ -129,10 +131,11 @@ impl Sq {

// sets sq.array to point to current sq.sqe_head
fn flush(&mut self) -> u32 {
let mask: u32 = *self.kring_mask;
let mask: u32 = unsafe { *self.kring_mask };
let to_submit = self.sqe_tail - self.sqe_head;

let mut ktail = self.ktail.load(Acquire);
let mut ktail =
unsafe { &*self.ktail }.load(Acquire);

for _ in 0..to_submit {
let index = ktail & mask;
Expand All @@ -142,7 +145,9 @@ impl Sq {
self.sqe_head += 1;
}

let swapped = self.ktail.swap(ktail, Release);
let swapped =
unsafe { &*self.ktail }.swap(ktail, Release);

assert_eq!(swapped, ktail - to_submit);

to_submit
Expand Down Expand Up @@ -180,7 +185,7 @@ impl Sq {
to_submit -= u32::try_from(ret).unwrap();
}
flushed
} else if self.kflags.load(Acquire)
} else if unsafe { &*self.kflags }.load(Acquire)
& IORING_SQ_NEED_WAKEUP
!= 0
{
Expand All @@ -205,7 +210,10 @@ impl Sq {
} else {
0
};
assert_eq!(self.kdropped.load(Relaxed), 0);
assert_eq!(
unsafe { &*self.kdropped }.load(Relaxed),
0
);
u64::from(submitted)
}
}
37 changes: 23 additions & 14 deletions src/io_uring/uring.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::*;
use std::os::unix::io::{AsRawFd, IntoRawFd};

/// Nice bindings for the shiny new linux IO system
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -71,8 +70,8 @@ impl Uring {
ring_fd,
sq: Mutex::new(sq),
config,
in_flight: in_flight,
ticket_queue: ticket_queue,
in_flight,
ticket_queue,
loaded: 0.into(),
submitted: 0.into(),
}
Expand Down Expand Up @@ -148,7 +147,9 @@ impl Uring {
address: &std::net::SocketAddr,
order: Ordering,
) -> Completion<'a, ()>
where F: AsRawFd {
where
F: AsRawFd,
{
let (addr, len) = addr2raw(address);
self.with_sqe(None, false, |sqe| {
sqe.prep_rw(
Expand Down Expand Up @@ -729,15 +730,23 @@ impl Uring {
}
}

fn addr2raw(addr: &std::net::SocketAddr) -> (*const libc::sockaddr, libc::socklen_t) {
match *addr {
std::net::SocketAddr::V4(ref a) => {
let b: *const std::net::SocketAddrV4 = a;
(b as *const _, std::mem::size_of_val(a) as libc::socklen_t)
}
std::net::SocketAddr::V6(ref a) => {
let b: *const std::net::SocketAddrV6 = a;
(b as *const _, std::mem::size_of_val(a) as libc::socklen_t)
fn addr2raw(
addr: &std::net::SocketAddr,
) -> (*const libc::sockaddr, libc::socklen_t) {
match *addr {
std::net::SocketAddr::V4(ref a) => {
let b: *const std::net::SocketAddrV4 = a;
(
b as *const _,
std::mem::size_of_val(a) as libc::socklen_t,
)
}
std::net::SocketAddr::V6(ref a) => {
let b: *const std::net::SocketAddrV6 = a;
(
b as *const _,
std::mem::size_of_val(a) as libc::socklen_t,
)
}
}
}
}
4 changes: 1 addition & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,5 @@ impl FromCqe for usize {
}

impl FromCqe for () {
fn from_cqe(_: io_uring::io_uring_cqe) -> () {
()
}
fn from_cqe(_: io_uring::io_uring_cqe) {}
}