Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ keywords = ["io", "async", "non-blocking", "futures"]
io-uring = "0.7.4"
libc = "0.2.167"
log = "0.4.22"
ringbuffer = "0.15.0"
slab = "0.4.9"
tokio-stream = { version = "0.1.17", default-features = false }

[dev-dependencies]
anyhow = "1.0.81"
Expand Down
6 changes: 4 additions & 2 deletions examples/tcp_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::time::Duration;

use anyhow::{Context, Result};
use log::debug;
use tokio_stream::StreamExt;
use trale::{
futures::{read::AsyncRead, tcp::TcpListener, timer::Timer, write::AsyncWrite},
task::{Executor, TaskJoiner},
Expand All @@ -24,13 +25,14 @@ fn main() -> Result<()> {
let echo_task: TaskJoiner<Result<usize>> = Executor::spawn(async {
let mut buf = [0u8; 1];
let mut bytes_read: usize = 0;
let listener = TcpListener::bind("127.0.0.1:5000").context("Could not bind")?;
let mut listener = TcpListener::bind("127.0.0.1:5000").context("Could not bind")?;

println!("Waiting for connection on 127.0.0.1:5000");

let mut conn = listener
.accept()
.next()
.await
.unwrap()
.context("Could not accept incoming connection")?;

// We only want to accept a single connection. Drop the lisetner once
Expand Down
4 changes: 2 additions & 2 deletions examples/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ fn main() {
});

Executor::spawn(async {
let mut buf = [0xadu8; 20];
let buf = [0xadu8; 20];
let udpsock = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).unwrap();
Timer::sleep(Duration::from_secs(1)).unwrap().await;
let len = udpsock
.send_to(&mut buf, (Ipv4Addr::LOCALHOST, 9998))
.send_to(&buf, (Ipv4Addr::LOCALHOST, 9998))
.await
.unwrap();

Expand Down
16 changes: 8 additions & 8 deletions examples/web_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
path::PathBuf,
sync::OnceLock,
};
use tokio_stream::StreamExt;
use trale::{
futures::{
fs::File,
Expand Down Expand Up @@ -133,12 +134,10 @@ async fn handle_connection(mut conn: TcpStream) -> anyhow::Result<()> {

let file = if path == PathBuf::from("/") {
ARGS.get().unwrap().webroot.join("index.html")
} else if let Ok(path) = path.strip_prefix("/") {
ARGS.get().unwrap().webroot.join(path)
} else {
if let Ok(path) = path.strip_prefix("/") {
ARGS.get().unwrap().webroot.join(path)
} else {
return send_response_hdr(&mut conn, Response::NotFound, 0).await;
}
return send_response_hdr(&mut conn, Response::NotFound, 0).await;
};

send_file(conn, file).await
Expand All @@ -151,12 +150,12 @@ fn main() -> anyhow::Result<()> {

ARGS.set(args).expect("Should have never been set");

let listener = TcpListener::bind((Ipv4Addr::UNSPECIFIED, ARGS.get().unwrap().port))
let mut listener = TcpListener::bind((Ipv4Addr::UNSPECIFIED, ARGS.get().unwrap().port))
.context("Could not setup socket listener")?;

Executor::block_on(async move {
loop {
match listener.accept().await {
while let Some(conn) = listener.next().await {
match conn {
Ok(conn) => {
Executor::spawn(async {
if let Err(e) = handle_connection(conn).await {
Expand All @@ -167,6 +166,7 @@ fn main() -> anyhow::Result<()> {
Err(e) => error!("Could not accept incoming connection: {e:?}"),
}
}
eprintln!("Bye!");
});

Ok(())
Expand Down
79 changes: 38 additions & 41 deletions src/futures/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@
//! use trale::futures::tcp::TcpListener;
//! use trale::futures::read::AsyncRead;
//! use trale::futures::write::AsyncWrite;
//! use tokio_stream::StreamExt;
//! async {
//! let listener = TcpListener::bind("0.0.0.0:8888")?;
//! let mut sock = listener.accept().await?;
//! let mut listener = TcpListener::bind("0.0.0.0:8888")?;
//! while let Some(Ok(mut sock)) = listener.next().await {
//! let mut buf = [0u8; 1];
//! loop {
//! let len = sock.read(&mut buf).await?;
//! if len == 0 {
//! return Ok(());
//! loop {
//! let len = sock.read(&mut buf).await?;
//! if len == 0 {
//! return Ok(());
//! }
//! sock.write(&buf).await?;
//! }
//! sock.write(&buf).await?;
//! }
//!# Ok::<(), std::io::Error>(())
//! };
Expand All @@ -33,14 +35,14 @@ use std::{
net::{SocketAddr, ToSocketAddrs},
os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, OwnedFd},
pin::Pin,
ptr::null_mut,
task::{Context, Poll},
};

use io_uring::{opcode, types};
use libc::{AF_INET, AF_INET6, SOCK_STREAM};
use tokio_stream::Stream;

use crate::reactor::{Reactor, ReactorIo};
use crate::reactor::{MultishotReactorIo, Reactor, ReactorIo};

use super::{
read::{AsyncRead, AsyncReader},
Expand All @@ -54,15 +56,7 @@ use super::{
/// accept connections on the specified address.
pub struct TcpListener {
inner: OwnedFd,
}

/// A future for accepting new connections.
///
/// Call `.await` to pause the current task until a new connection has been
/// established.
pub struct Acceptor<'fd> {
inner: BorrowedFd<'fd>,
io: ReactorIo,
io: MultishotReactorIo,
}

fn mk_sock(addr: &SocketAddr) -> std::io::Result<OwnedFd> {
Expand All @@ -85,8 +79,11 @@ impl TcpListener {
/// This function will create a new socket, bind it to one of the specified
/// `addrs` and returna [TcpListener]. If binding to *all* of the specified
/// addresses fails then the reason for failing to bind to the *last*
/// address is returned. Otherwise, use [TcpListener::accept] to obtain a
/// future to accept new connections.
/// address is returned. Otherwise, `.await` on the listener to await a new
/// connection.
///
/// When this future returns None, the listener will no long accept any
/// further connections and should be dropped.
pub fn bind(addrs: impl ToSocketAddrs) -> std::io::Result<Self> {
let addrs = addrs.to_socket_addrs()?;
let mut last_err = ErrorKind::NotFound.into();
Expand All @@ -102,38 +99,38 @@ impl TcpListener {

match unsafe { libc::listen(sock.as_raw_fd(), 1024) } {
-1 => last_err = std::io::Error::last_os_error(),
0 => return Ok(Self { inner: sock }),
0 => {
return Ok(Self {
inner: sock,
io: Reactor::new_multishot_io(),
})
}
_ => unreachable!("listen() cannot return a value other than 0 or -1"),
}
}

Err(last_err)
}

/// Return a future for accepting a new connection.
///
/// You can `.await` the returned future to wait for a new incoming
/// connection. Once a connection has been successfully established, a new
/// [TcpStream] is returned which is connected to the peer.
pub fn accept(&self) -> Acceptor {
Acceptor {
inner: self.inner.as_fd(),
io: Reactor::new_io(),
}
}
}

impl Future for Acceptor<'_> {
type Output = std::io::Result<TcpStream>;
impl Stream for TcpListener {
type Item = std::io::Result<TcpStream>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let entry = opcode::Accept::new(types::Fd(self.inner.as_raw_fd()), null_mut(), null_mut());
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

self.io
.submit_or_get_result(|| (entry.build(), cx.waker().clone()))
this.io
.submit_or_get_result(|| {
(
opcode::AcceptMulti::new(types::Fd(this.inner.as_raw_fd())).build(),
cx.waker().clone(),
)
})
.map(|x| {
x.map(|fd| TcpStream {
inner: unsafe { OwnedFd::from_raw_fd(fd) },
x.map(|x| {
x.map(|fd| TcpStream {
inner: unsafe { OwnedFd::from_raw_fd(fd) },
})
})
})
}
Expand Down
Loading