diff --git a/Cargo.lock b/Cargo.lock index 6dfd8d9..c72efda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1272,6 +1272,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "vodozemac", "webrtc", ] @@ -1595,6 +1596,7 @@ name = "p2p" version = "0.1.0" dependencies = [ "bitflags 2.9.1", + "bytes", "chrono", "error", "serde", diff --git a/discover/src/websocket.rs b/discover/src/websocket.rs index 44c8db2..c17daab 100644 --- a/discover/src/websocket.rs +++ b/discover/src/websocket.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use crate::models::phoenix::Message as PhxMessage; -use crate::models::response::{Response, Status}; type _Sender = SplitSink>, Message>; @@ -27,7 +26,6 @@ type Reader = const DEFAULT_QUEUED_MESSAGE: usize = 32; const SOCKET_PATH: &str = "/socket/websocket"; -const AUTH_PATH: &str = "/api/auth"; /// WebSocket client manager. #[allow(dead_code)] @@ -73,12 +71,6 @@ pub struct WebSocket { max_queued_message: usize, } -#[derive(Debug, Serialize)] -struct Auth { - vanity: String, - password: Option, -} - impl WebSocket { /// Create a new [`WebSocket`] without connecting it. pub fn new>(url: T) -> Result { @@ -96,22 +88,10 @@ impl WebSocket { }) } - fn get_scheme(&self, base: &str) -> String { - match self.url.scheme() { - "https" | "wss" => format!("{base}s"), - _ => base.to_owned(), - } - } - /// Establish the WebSocket connection. /// - /// First, it makes an HTTP request to get the JWT. - /// Then, it connects to WebSocket using the token. - pub async fn connect( - &mut self, - identifier: T, - password: Option, - ) -> Result<()> { + /// Uses pre-generated JWT by Turms. + pub async fn connect(&mut self, token: String) -> Result<()> { // Ensure the URL has a valid host. let host = { let host_str = self @@ -125,29 +105,10 @@ impl WebSocket { } }; - let scheme = self.get_scheme("http"); - let url = format!("{scheme}://{host}{AUTH_PATH}"); - - // Send request and get token. - let token = reqwest::Client::new() - .post(&url) - .json(&Auth { - vanity: identifier.to_string(), - password: password.map(|p| p.to_string()), - }) - .send() - .await? - .json::() - .await?; - - if token.status == Status::Error || token.data.is_empty() { - return Err(Error::AuthenticationFailed); - } - // Establish WebSocket connection. - let scheme = self.get_scheme("ws"); + let scheme = self.url.scheme(); let socket_url = - format!("{scheme}://{host}{SOCKET_PATH}?token={}", token.data); + format!("{scheme}://{host}{SOCKET_PATH}?token={token}"); let (mut socket, _response) = connect_async(&socket_url).await?; diff --git a/libturms/Cargo.toml b/libturms/Cargo.toml index f4b9004..69e037d 100644 --- a/libturms/Cargo.toml +++ b/libturms/Cargo.toml @@ -22,8 +22,10 @@ bench = false discover = { path = "../discover" } p2p = { path = "../p2p" } error = { path = "../error" } +vodozemac = "0.9" serde = { workspace = true } serde_yaml = "0.9" +serde_json = "1" webrtc = { workspace = true } tokio = { workspace = true, features = ["full"] } futures-util = "0.3" @@ -32,7 +34,6 @@ tracing = "0.1" [dev-dependencies] anyhow = "*" serde = "*" -serde_json = "1" tokio = { version = "1", features = ["full"] } futures-util = "0.3" tracing = "0.1" diff --git a/libturms/examples/libturms/answer.rs b/libturms/examples/libturms/answer.rs index 3ffdc11..6a21db4 100644 --- a/libturms/examples/libturms/answer.rs +++ b/libturms/examples/libturms/answer.rs @@ -33,9 +33,9 @@ async fn main() { let mut managed_turms = Turms::from_config(libturms::ConfigFinder::::Text(config)) .unwrap() - .connect("user2", None) + /*.connect("user2", None) .await - .unwrap(); + .unwrap()*/; println!("Enter peer offer: "); diff --git a/libturms/examples/libturms/offer.rs b/libturms/examples/libturms/offer.rs index bea7db8..eb44370 100644 --- a/libturms/examples/libturms/offer.rs +++ b/libturms/examples/libturms/offer.rs @@ -33,9 +33,9 @@ async fn main() { let mut managed_turms = Turms::from_config(libturms::ConfigFinder::::Text(config)) .unwrap() - .connect("user", None) + /*.connect("user2", None) .await - .unwrap(); + .unwrap()*/; let offer = managed_turms.create_peer_offer().await.unwrap(); println!("My offer is: {offer}"); diff --git a/libturms/examples/tokio.rs b/libturms/examples/tokio.rs index d00fffa..3abc305 100644 --- a/libturms/examples/tokio.rs +++ b/libturms/examples/tokio.rs @@ -5,6 +5,8 @@ use libturms::p2p::models::*; use libturms::p2p::webrtc; use tracing_subscriber::prelude::*; +use std::io::{self, BufRead}; + const LOCAL_URL: &str = "http://localhost:4000"; #[tokio::main] @@ -20,9 +22,16 @@ async fn main() { .with(tracing_subscriber::fmt::layer()) .init(); + println!("Enter Turms token: "); + + let mut buffer = String::new(); + let stdin = io::stdin(); + let mut handle = stdin.lock(); + handle.read_line(&mut buffer).unwrap(); + let mut ws = websocket::WebSocket::new(LOCAL_URL).expect("URL is invalid."); - ws.connect("user", None) + ws.connect(buffer) .await .expect("Is the password wrong? Or server offline?"); diff --git a/libturms/src/channel.rs b/libturms/src/channel.rs index 27324c2..24ccd78 100644 --- a/libturms/src/channel.rs +++ b/libturms/src/channel.rs @@ -1,24 +1,102 @@ //! Handle p2p (webrtc) data channel. -use webrtc::data_channel::RTCDataChannel; +use p2p::models::{Event, X3DH}; +use p2p::webrtc::WebRTCManager; +use p2p::{get_account, triple_diffie_hellman}; +use tokio::sync::Mutex; +use vodozemac::olm::{OlmMessage, SessionConfig}; use webrtc::data_channel::data_channel_message::DataChannelMessage; use std::sync::Arc; -use crate::Turms; +#[derive(Clone, Debug)] +struct Handler { + webrtc: WebRTCManager, + label: String, +} /// Handle channel by parsing income messages. -pub fn handle_channel(_turms: &mut Turms, channel: Arc) { - let label = channel.label().to_owned(); - let d_label = label.clone(); - - channel.on_open(Box::new(move || { - tracing::info!(%label, "new channel opened"); - Box::pin(async {}) - })); - - channel.on_message(Box::new(move |msg: DataChannelMessage| { - tracing::trace!(%d_label, ?msg, "message received"); - Box::pin(async {}) - })); +pub fn handle_channel(webrtc: WebRTCManager) { + let Some(channel) = webrtc.channel.clone() else { + tracing::error!("no WebRTC channel"); + return; + }; + + let handler = Handler { + webrtc, + label: channel.label().to_owned(), + }; + + { + let h = handler.clone(); + channel.on_open(Box::new(move || { + let label = &h.label; + tracing::info!(%label, "new channel opened"); + Box::pin(async move { + if let Err(err) = triple_diffie_hellman(&h.webrtc).await { + tracing::error!(%err, "X3DH failed"); + }; + }) + })); + } + + { + let h = handler.clone(); + channel.on_message(Box::new(move |msg: DataChannelMessage| { + let label = &h.label; + tracing::trace!(%label, ?msg, "webrtc message received"); + + let mut webrtc = h.webrtc.clone(); + Box::pin(async move { + let data = match webrtc.session { + Some(session) => { + let message = match vodozemac::olm::Message::from_bytes(&msg.data) { + Ok(msg) => msg, + Err(_) => { + return; + } + }; + + session.lock().await.decrypt(&OlmMessage::from(message)).unwrap_or_else(|_| msg.data.to_vec()) + } + None => msg.data.to_vec(), + }; + + let Ok(json) = serde_json::from_slice(&data) else { + tracing::debug!("decoding failed"); + return; + }; + tracing::debug!(?json, "decoded webrtc message"); + + match json { + Event::DHKey(x3dh) => { + let mut account = get_account().lock().await; + let public_key = account.curve25519_key(); + if let Some(otk) = x3dh.otk { + let mut session: vodozemac::olm::Session = account.create_outbound_session(SessionConfig::version_2(), x3dh.public_key, otk); + let message = session.encrypt(""); + webrtc.session = Some(Arc::new(Mutex::new(session))); + if let OlmMessage::PreKey(pk) = message { + match serde_json::to_string(&Event::DHKey(X3DH { public_key, otk: None, prekey: Some(pk) })) { + Ok(message) => { + if let Err(err) = webrtc.send(message).await { + tracing::error!(%err, "failed to send message"); + } + } + Err(err) => tracing::error!(%err, "failed to serialize DHKey event"), + } + } + } else if let Some(prekey) = x3dh.prekey { + if let Err(err) = account.create_inbound_session(x3dh.public_key, &prekey) { + tracing::error!(%err, "failed to create inbound session"); + } + } else { + tracing::error!("received X3DH request without otk nor pre-key"); + } + }, + _ => unimplemented!(), + } + }) + })); + } } diff --git a/libturms/src/lib.rs b/libturms/src/lib.rs index 2713dea..361b239 100644 --- a/libturms/src/lib.rs +++ b/libturms/src/lib.rs @@ -69,13 +69,9 @@ impl Turms { } /// Init WebSocket connection and handle messages. - pub async fn connect( - mut self, - identifier: T, - password: Option, - ) -> Result { + pub async fn connect(mut self, token: T) -> Result { if let Some(ref mut turms) = self.turms { - turms.connect(identifier, password).await?; + turms.connect(token.to_string()).await?; let ws = turms.reader.clone().ok_or(error::ConnectionClosed)?; tokio::spawn(async move { @@ -96,8 +92,8 @@ impl Turms { pub async fn create_peer_offer(&mut self) -> Result { let mut webrtc = WebRTCManager::init().await?; - let channel = webrtc.create_channel().await?; - channel::handle_channel(self, channel); + let _channel = webrtc.create_channel().await?; + channel::handle_channel(webrtc.clone()); let offer = webrtc.create_offer().await?; // use offer-answer common datas later. @@ -127,8 +123,8 @@ impl Turms { pub async fn answer_to_peer(&mut self, offer: String) -> Result { let mut webrtc = WebRTCManager::init().await?; - let channel = webrtc.create_channel().await?; - channel::handle_channel(self, channel); + let _channel = webrtc.create_channel().await?; + channel::handle_channel(webrtc.clone()); let offer = webrtc.connect(offer).await?; diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 9e4f58c..9fe7d9b 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -13,3 +13,4 @@ bitflags = { version = "2", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] } error = { path = "../error" } vodozemac = "0.9" +bytes = "1.10" diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 4e91351..ecd9caf 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -8,30 +8,31 @@ pub mod webrtc; /// X3DH over WebRTC for Turms. mod x3dh; -use std::sync::{Mutex, OnceLock}; +pub use x3dh::triple_diffie_hellman; -static ACCOUNT: OnceLock> = OnceLock::new(); +use tokio::sync::Mutex; +use vodozemac::olm::Account; + +use std::sync::OnceLock; + +static ACCOUNT: OnceLock> = OnceLock::new(); + +pub fn get_account() -> &'static Mutex { + ACCOUNT.get_or_init(|| Mutex::new(Account::new())) +} /// Get user account. -pub fn save_account() -> error::Result { - let account = - ACCOUNT.get_or_init(|| Mutex::new(vodozemac::olm::Account::new())); - - Ok(serde_json::to_string( - &account - .lock() - .map_err(|_| error::Error::MutexPoisoned)? - .pickle(), - )?) +pub async fn save_account() -> error::Result { + let account = get_account(); + + Ok(serde_json::to_string(&account.lock().await.pickle())?) } /// Set user account. pub fn restore_account(json: &str) -> Result<(), serde_json::Error> { let pickle: vodozemac::olm::AccountPickle = serde_json::from_str(json)?; - let _ = ACCOUNT.get_or_init(|| { - Mutex::new(vodozemac::olm::Account::from_pickle(pickle)) - }); + let _ = ACCOUNT.get_or_init(|| Mutex::new(Account::from_pickle(pickle))); Ok(()) } diff --git a/p2p/src/models.rs b/p2p/src/models.rs index b2c24a0..7cfb5ed 100644 --- a/p2p/src/models.rs +++ b/p2p/src/models.rs @@ -1,6 +1,8 @@ use bitflags::bitflags; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use vodozemac::Curve25519PublicKey; +use vodozemac::olm::PreKeyMessage; /// Encapsulates events. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -13,9 +15,11 @@ pub enum Event { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct X3DH { /// Curve25519 public key. - pub public_key: Vec, - /// One-time-key. - pub otk: Vec, + pub public_key: Curve25519PublicKey, + /// One-time key. + pub otk: Option, + /// Receiver pre-key. + pub prekey: Option, } /// Represents a message in a chat. diff --git a/p2p/src/webrtc.rs b/p2p/src/webrtc.rs index 0853890..fe97e18 100644 --- a/p2p/src/webrtc.rs +++ b/p2p/src/webrtc.rs @@ -1,5 +1,6 @@ use error::Result; -use vodozemac::olm::Account; +use tokio::sync::Mutex as AsyncMutex; +use tokio::time::{Duration, sleep}; use webrtc::api::APIBuilder; use webrtc::api::interceptor_registry::register_default_interceptors; use webrtc::api::media_engine::MediaEngine; @@ -15,8 +16,7 @@ use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; use std::fmt; use std::sync::{Arc, Mutex}; -use crate::ACCOUNT; -use crate::models::X3DH; +const MAX_ATTEMPTS: u8 = 4; /// Simple WebRTC connection manager. #[derive(Clone)] @@ -27,7 +27,10 @@ pub struct WebRTCManager { pub ice: Arc>>, /// Data channel. pub channel: Option>, - is_initiator: bool, + /// Cryptographic session. + pub session: Option>>, + /// Know if user is offerer. + pub is_initiator: bool, offer: String, } @@ -52,7 +55,7 @@ impl WebRTCManager { .build(); // If account is not initialized, consider creating a new one. - ACCOUNT.get_or_init(|| Mutex::new(Account::new())); + crate::get_account(); let peer_connection = Arc::new(api.new_peer_connection(config).await?); let webrtc = WebRTCManager { @@ -61,6 +64,7 @@ impl WebRTCManager { offer: String::default(), is_initiator: false, channel: None, + session: None, }; let ice = Arc::downgrade(&webrtc.ice); @@ -110,11 +114,14 @@ impl WebRTCManager { .await .recv() .await; - let offer = - self.peer_connection - .local_description() - .await - .ok_or(error::Error::WebRTC(webrtc::error::Error::ErrPeerConnSDPTypeInvalidValueSetLocalDescription))?; + + let offer = self + .peer_connection + .local_description() + .await + .ok_or(error::Error::WebRTC( + webrtc::error::Error::ErrPeerConnSDPTypeInvalidValueSetLocalDescription, + ))?; self.offer = serde_json::to_string(&offer)?; self.is_initiator = true; @@ -134,11 +141,14 @@ impl WebRTCManager { .await .recv() .await; - let answer = - self.peer_connection - .local_description() - .await - .ok_or(error::Error::WebRTC(webrtc::error::Error::ErrPeerConnSDPTypeInvalidValueSetLocalDescription))?; + + let answer = self + .peer_connection + .local_description() + .await + .ok_or(error::Error::WebRTC( + webrtc::error::Error::ErrPeerConnSDPTypeInvalidValueSetLocalDescription, + ))?; self.offer = serde_json::to_string(&answer)?; @@ -167,21 +177,9 @@ impl WebRTCManager { .create_data_channel("data", Some(dc_init)) .await?; - let acc = Self { - channel: Some(Arc::clone(&channel)), - offer: String::default(), - ..self.clone() // uses Arc::clone(&T). - }; - channel.on_open(Box::new(move || { - Box::pin(async move { crate::triple_diffie_hellman!(acc) }) - })); + self.channel = Some(Arc::clone(&channel)); - self.channel = Some(channel); - - Ok(self - .channel - .clone() - .ok_or(webrtc::Error::ErrConnectionClosed)?) + Ok(channel) } /// Convert a [`String`] to [`RTCSessionDescription`]. @@ -191,6 +189,43 @@ impl WebRTCManager { ) -> Result { Ok(serde_json::from_str(session)?) } + + /// Sender with retries. + /// Useful during X3DH negociation. + pub async fn send(&self, message: String) -> Result<()> { + let msg = match self.session.clone() { + Some(session) => { + session.lock().await.encrypt(message).message().to_vec() + }, + None => message.as_bytes().to_vec(), + }; + + match self.channel.as_ref() { + Some(ch) => { + for n in 0..MAX_ATTEMPTS { + if n > 0 { + // Wait only if first try is failed. + sleep(Duration::from_secs(u64::from(n) * 5)).await; + } + + match ch.send(&bytes::Bytes::from(msg.clone())).await { + Ok(_) => break, + Err(err) => { + tracing::error!(%err, "{n}th attempt to send message failed"); + if n == MAX_ATTEMPTS - 1 { + return Err(error::Error::MessageSendFailed); + } + }, + } + } + + Ok(()) + }, + None => { + Err(error::Error::WebRTC(webrtc::Error::ErrDataChannelNotOpen)) + }, + } + } } impl fmt::Debug for WebRTCManager { diff --git a/p2p/src/x3dh.rs b/p2p/src/x3dh.rs index 74923fa..cd3baac 100644 --- a/p2p/src/x3dh.rs +++ b/p2p/src/x3dh.rs @@ -1,76 +1,46 @@ -pub const MAX_ATTEMPTS: u8 = 4; +use error::{Error, Result}; -/// Spawn triple diffie hellman generation logic over WebRTC. -#[macro_export] -macro_rules! triple_diffie_hellman { - ($acc:expr) => {{ - use $crate::models::Event::DHKey; - use $crate::ACCOUNT; - use $crate::x3dh::MAX_ATTEMPTS; - use tracing::{error, debug}; - use tokio::time::{sleep, Duration}; - - if $acc.is_initiator { - return; - } - - let account = - ACCOUNT.get_or_init(|| Mutex::new(Account::new())); - - // Generate public key and one-time key. - let (public_key, otk) = { - let mut account = match account.lock() { - Ok(a) => a, - Err(_) => { - error!("account mutex is poisoned"); - return; - }, - }; - - account.generate_one_time_keys(1); - - let public_key = account.curve25519_key().to_vec(); - let otk = match account.one_time_keys().values().next() { - Some(k) => k.to_vec(), - None => { - // Since insertion occurs before, this should never happen here. - // If this is the case, it is best to restart. Let it crash. Poison Mutex. - unreachable!() - }, - }; +use crate::models::Event::DHKey; +use crate::models::X3DH; - (public_key, otk) - }; - - let message = match serde_json::to_string(&DHKey(X3DH { - public_key, - otk, - })) { - Ok(message) => message, - Err(err) => { - error!(%err, "serialization failed"); - return; +/// Spawn triple diffie hellman generation logic over WebRTC. +pub async fn triple_diffie_hellman( + acc: &crate::webrtc::WebRTCManager, +) -> Result<()> { + if acc.is_initiator { + return Ok(()); + } + + let account = crate::get_account(); + + // Generate public key and one-time key. + let (public_key, otk) = { + let mut account = account.lock().await; + + account.generate_one_time_keys(1); + + let public_key = account.curve25519_key(); + let otk = match account.one_time_keys().values().next() { + Some(k) => *k, + None => { + // Since insertion occurs before, this should never happen here. + return Err(Error::MutexPoisoned); }, }; - if let Some(ch) = $acc.channel.as_ref() { - for n in 0..MAX_ATTEMPTS { - if n > 0 { - // Wait only if first try is failed. - sleep(Duration::from_secs(u64::from(n) * 5)).await; - } + (public_key, Some(otk)) + }; + + let message = serde_json::to_string(&DHKey(X3DH { + public_key, + otk, + prekey: None, + }))?; + + if acc.send(message).await.is_ok() { + account.lock().await.mark_keys_as_published(); + tracing::debug!("public key and one-time key published"); + }; - match ch.send_text(&message).await { - Ok(_) => { - if let Ok(mut acc_lock) = account.lock() { - acc_lock.mark_keys_as_published(); - debug!("public key and one-time key published") - } - break; - }, - Err(err) => error!(%err, "{n}th attempt to send keys to a peer failed"), - } - } - } - }}; + Ok(()) }