From 4afb9a81a4f8e2567dce011c3286cb875051d860 Mon Sep 17 00:00:00 2001 From: RealHinome Date: Fri, 24 Oct 2025 17:49:12 +0200 Subject: [PATCH 1/6] Init --- Cargo.lock | 241 ++++++++++++++++++++++++++++++++++++++++++++-- p2p/Cargo.toml | 1 + p2p/src/lib.rs | 2 + p2p/src/models.rs | 9 ++ p2p/src/webrtc.rs | 76 +++++++++++++-- p2p/src/x3dh.rs | 8 ++ 6 files changed, 322 insertions(+), 15 deletions(-) create mode 100644 p2p/src/x3dh.rs diff --git a/Cargo.lock b/Cargo.lock index 57d45f7..6dfd8d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -88,6 +88,15 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +dependencies = [ + "serde", +] + [[package]] name = "asn1-rs" version = "0.6.2" @@ -279,6 +288,30 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chacha20" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3613f74bd2eac03dad61bd53dbe620703d4371614fe0bc3b9f04dd36fe4e818" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + +[[package]] +name = "chacha20poly1305" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10cd79432192d1c0f4e1a0fef9527696cc039165d729fb41b3f4f4f354c2dc35" +dependencies = [ + "aead", + "chacha20", + "cipher", + "poly1305", + "zeroize", +] + [[package]] name = "chrono" version = "0.4.41" @@ -302,6 +335,7 @@ checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" dependencies = [ "crypto-common", "inout", + "zeroize", ] [[package]] @@ -394,6 +428,7 @@ dependencies = [ "digest", "fiat-crypto", "rustc_version", + "serde", "subtle", "zeroize", ] @@ -512,6 +547,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" dependencies = [ "pkcs8", + "serde", "signature", ] @@ -523,12 +559,19 @@ checksum = "70e796c081cee67dc755e1a36a0a172b897fab85fc3f6bc48307991f64e4eca9" dependencies = [ "curve25519-dalek", "ed25519", + "rand_core 0.6.4", "serde", "sha2", "subtle", "zeroize", ] +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "elliptic-curve" version = "0.13.8" @@ -1091,9 +1134,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.9.0" +version = "2.11.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" +checksum = "4b0f83760fb341a774ed326568e19f5a863af4a952def8c39f9ab92fd95b88e5" dependencies = [ "equivalent", "hashbrown", @@ -1145,6 +1188,15 @@ dependencies = [ "serde", ] +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.15" @@ -1260,6 +1312,29 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matrix-pickle" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e2551de3bba2cc65b52dc6b268df6114011fe118ac24870fbcf1b35537bd721" +dependencies = [ + "matrix-pickle-derive", + "thiserror 1.0.69", +] + +[[package]] +name = "matrix-pickle-derive" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f75de44c3120d78e978adbcf6d453b20ba011f3c46363e52d1dbbc72f545e9fb" +dependencies = [ + "proc-macro-crate", + "proc-macro-error2", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "md-5" version = "0.10.6" @@ -1526,6 +1601,7 @@ dependencies = [ "serde_json", "tokio", "tracing", + "vodozemac", "webrtc", ] @@ -1628,6 +1704,17 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "poly1305" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8159bd90725d2df49889a078b54f4f79e87f1f8a8444194cdca81d38f5393abf" +dependencies = [ + "cpufeatures", + "opaque-debug", + "universal-hash", +] + [[package]] name = "polyval" version = "0.6.2" @@ -1679,6 +1766,36 @@ dependencies = [ "elliptic-curve", ] +[[package]] +name = "proc-macro-crate" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983" +dependencies = [ + "toml_edit", +] + +[[package]] +name = "proc-macro-error-attr2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" +dependencies = [ + "proc-macro2", + "quote", +] + +[[package]] +name = "proc-macro-error2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" +dependencies = [ + "proc-macro-error-attr2", + "proc-macro2", + "quote", +] + [[package]] name = "proc-macro2" version = "1.0.95" @@ -1688,6 +1805,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "quote" version = "1.0.40" @@ -2087,18 +2227,38 @@ checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0" [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_bytes" +version = "0.11.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5d440709e79d88e51ac01c4b72fc6cb7314017bb7da9eeff678aa94c10e3ea8" +dependencies = [ + "serde", + "serde_core", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -2529,6 +2689,36 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml_datetime" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2cdb639ebbc97961c51720f858597f7f24c4fc295327923af55b74c3c724533" +dependencies = [ + "serde_core", +] + +[[package]] +name = "toml_edit" +version = "0.23.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6485ef6d0d9b5d0ec17244ff7eb05310113c3f316f2d14200d4de56b3cb98f8d" +dependencies = [ + "indexmap", + "toml_datetime", + "toml_parser", + "winnow", +] + +[[package]] +name = "toml_parser" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e" +dependencies = [ + "winnow", +] + [[package]] name = "tower" version = "0.5.2" @@ -2765,6 +2955,36 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "vodozemac" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c022a277687e4e8685d72b95a7ca3ccfec907daa946678e715f8badaa650883d" +dependencies = [ + "aes", + "arrayvec", + "base64", + "base64ct", + "cbc", + "chacha20poly1305", + "curve25519-dalek", + "ed25519-dalek", + "getrandom 0.2.16", + "hkdf", + "hmac", + "matrix-pickle", + "prost", + "rand 0.8.5", + "serde", + "serde_bytes", + "serde_json", + "sha2", + "subtle", + "thiserror 2.0.12", + "x25519-dalek", + "zeroize", +] + [[package]] name = "waitgroup" version = "0.1.2" @@ -3335,6 +3555,15 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" +[[package]] +name = "winnow" +version = "0.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21a0236b59786fed61e2a80582dd500fe61f18b5dca67a4a067d0bc9039339cf" +dependencies = [ + "memchr", +] + [[package]] name = "wit-bindgen-rt" version = "0.39.0" diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index bb16f93..9e4f58c 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -12,3 +12,4 @@ webrtc = { workspace = true } bitflags = { version = "2", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] } error = { path = "../error" } +vodozemac = "0.9" diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 922519e..9b75cd5 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -5,3 +5,5 @@ pub mod models; #[deny(missing_docs, missing_debug_implementations)] /// WebRTC interface. pub mod webrtc; +/// X3DH over WebRTC for Turms. +mod x3dh; diff --git a/p2p/src/models.rs b/p2p/src/models.rs index ceb6230..b2c24a0 100644 --- a/p2p/src/models.rs +++ b/p2p/src/models.rs @@ -5,10 +5,19 @@ use serde::{Deserialize, Serialize}; /// Encapsulates events. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Event { + DHKey(X3DH), Message(Message), Typing, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct X3DH { + /// Curve25519 public key. + pub public_key: Vec, + /// One-time-key. + pub otk: Vec, +} + /// Represents a message in a chat. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Message { diff --git a/p2p/src/webrtc.rs b/p2p/src/webrtc.rs index dd89ba5..f9a4186 100644 --- a/p2p/src/webrtc.rs +++ b/p2p/src/webrtc.rs @@ -1,4 +1,5 @@ use error::Result; +use vodozemac::olm::Account; use webrtc::api::APIBuilder; use webrtc::api::interceptor_registry::register_default_interceptors; use webrtc::api::media_engine::MediaEngine; @@ -14,15 +15,19 @@ use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; use std::fmt; use std::sync::{Arc, Mutex}; +use crate::models::X3DH; + /// Simple WebRTC connection manager. #[derive(Clone)] pub struct WebRTCManager { /// Granularity. pub peer_connection: Arc, - /// Candidate ICE. + /// ICE candidates. pub ice: Arc>>, /// Data channel. pub channel: Option>, + is_initiator: bool, + account: Arc>, offer: String, } @@ -51,7 +56,9 @@ impl WebRTCManager { peer_connection, ice: Arc::new(Mutex::new(Vec::new())), offer: String::default(), + is_initiator: false, channel: None, + account: Arc::new(Mutex::new(Account::new())), }; let ice = Arc::downgrade(&webrtc.ice); @@ -98,9 +105,14 @@ impl WebRTCManager { .await .recv() .await; - let offer = self.peer_connection.local_description().await.ok_or(error::Error::WebRTC(webrtc::error::Error::ErrPeerConnSDPTypeInvalidValueSetLocalDescription))?; + let offer = + self.peer_connection + .local_description() + .await + .ok_or(error::Error::WebRTC(webrtc::error::Error::ErrPeerConnSDPTypeInvalidValueSetLocalDescription))?; self.offer = serde_json::to_string(&offer)?; + self.is_initiator = true; Ok(self.offer.clone()) } @@ -117,7 +129,11 @@ impl WebRTCManager { .await .recv() .await; - let answer = self.peer_connection.local_description().await.ok_or(error::Error::WebRTC(webrtc::error::Error::ErrPeerConnSDPTypeInvalidValueSetLocalDescription))?; + let answer = + self.peer_connection + .local_description() + .await + .ok_or(error::Error::WebRTC(webrtc::error::Error::ErrPeerConnSDPTypeInvalidValueSetLocalDescription))?; self.offer = serde_json::to_string(&answer)?; @@ -130,6 +146,7 @@ impl WebRTCManager { self.peer_connection .set_remote_description(peer_offer) .await?; + self.is_initiator = false; self.create_answer().await } @@ -140,11 +157,53 @@ impl WebRTCManager { ..Default::default() }; - self.channel = Some( - self.peer_connection - .create_data_channel("data", Some(dc_init)) - .await?, - ); + let channel = self + .peer_connection + .create_data_channel("data", Some(dc_init)) + .await?; + + let acc = Self { + channel: Some(Arc::clone(&channel)), + offer: String::default(), + ..self.clone() // uses Arc::clone(&T). + }; + channel.on_open(Box::new(move || { + Box::pin(async move { + use crate::models::Event::DHKey; + + // Initate XDH3 key creation from the other side. + if acc.is_initiator { + return; + } + + // Generate public key. + acc.account.lock().unwrap().generate_one_time_keys(1); + let public_key = acc.account.lock().unwrap().curve25519_key().to_vec(); + let otk = acc + .account + .lock() + .unwrap() + .one_time_keys(); + let otk = otk.values() + .next() + .ok_or(error::Error::AuthenticationFailed) + .unwrap().to_vec(); + + acc + .account + .lock() + .unwrap().mark_keys_as_published(); + + // Send to peer second encryption layer. + acc.channel + .unwrap() + .send_text(serde_json::to_string(&DHKey(X3DH {public_key, otk})).unwrap()) + .await + .unwrap(); + }) + })); + + self.channel = Some(channel); Ok(self .channel @@ -165,7 +224,6 @@ impl fmt::Debug for WebRTCManager { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WebRTCManager") .field("peer_connection", &self.peer_connection) - .field("ice", &self.ice.lock()) .field( "channel", &self.channel.as_ref().map(|_| ""), diff --git a/p2p/src/x3dh.rs b/p2p/src/x3dh.rs new file mode 100644 index 0000000..94693f9 --- /dev/null +++ b/p2p/src/x3dh.rs @@ -0,0 +1,8 @@ +/// Spawn triple diffie hellman generation logic over WebRTC. +#[macro_export] +macro_rules! triple_diffie_hellman { + ($channel:expr) => {{ + use std::sync::Arc; + let + }}; +} From f019bfcf90c19086ff5b5ba600a1586a6ab70582 Mon Sep 17 00:00:00 2001 From: RealHinome Date: Sat, 25 Oct 2025 17:52:19 +0200 Subject: [PATCH 2/6] X3DH --- discover/src/jwt.rs | 16 +++++----- error/src/lib.rs | 2 ++ p2p/src/lib.rs | 28 +++++++++++++++++ p2p/src/webrtc.rs | 52 +++++++------------------------ p2p/src/x3dh.rs | 74 +++++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 120 insertions(+), 52 deletions(-) diff --git a/discover/src/jwt.rs b/discover/src/jwt.rs index 1d3e76f..265afba 100644 --- a/discover/src/jwt.rs +++ b/discover/src/jwt.rs @@ -180,16 +180,16 @@ impl TokenManager { .unwrap_or_default() .as_secs(); - if let Some(expire_at) = claims.expire_at { - if expire_at < timestamp { - return Err(Error::TokenExpired { expire_at }); - } + if let Some(expire_at) = claims.expire_at + && expire_at < timestamp + { + return Err(Error::TokenExpired { expire_at }); } - if let Some(not_before) = claims.not_before { - if not_before > timestamp { - return Err(Error::TooEarly { not_before }); - } + if let Some(not_before) = claims.not_before + && not_before > timestamp + { + return Err(Error::TooEarly { not_before }); } Ok(claims) diff --git a/error/src/lib.rs b/error/src/lib.rs index 561f96b..c8dde9e 100644 --- a/error/src/lib.rs +++ b/error/src/lib.rs @@ -32,6 +32,8 @@ pub enum Error { #[error(transparent)] WebRTC(#[from] webrtc::error::Error), + #[error("mutex is poisoned")] + MutexPoisoned, #[error("authentication failed")] AuthenticationFailed, diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 9b75cd5..4e91351 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -7,3 +7,31 @@ pub mod models; pub mod webrtc; /// X3DH over WebRTC for Turms. mod x3dh; + +use std::sync::{Mutex, OnceLock}; + +static ACCOUNT: OnceLock> = OnceLock::new(); + +/// Get user account. +pub fn save_account() -> error::Result { + let account = + ACCOUNT.get_or_init(|| Mutex::new(vodozemac::olm::Account::new())); + + Ok(serde_json::to_string( + &account + .lock() + .map_err(|_| error::Error::MutexPoisoned)? + .pickle(), + )?) +} + +/// Set user account. +pub fn restore_account(json: &str) -> Result<(), serde_json::Error> { + let pickle: vodozemac::olm::AccountPickle = serde_json::from_str(json)?; + + let _ = ACCOUNT.get_or_init(|| { + Mutex::new(vodozemac::olm::Account::from_pickle(pickle)) + }); + + Ok(()) +} diff --git a/p2p/src/webrtc.rs b/p2p/src/webrtc.rs index f9a4186..0853890 100644 --- a/p2p/src/webrtc.rs +++ b/p2p/src/webrtc.rs @@ -15,6 +15,7 @@ use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; use std::fmt; use std::sync::{Arc, Mutex}; +use crate::ACCOUNT; use crate::models::X3DH; /// Simple WebRTC connection manager. @@ -27,7 +28,6 @@ pub struct WebRTCManager { /// Data channel. pub channel: Option>, is_initiator: bool, - account: Arc>, offer: String, } @@ -51,6 +51,9 @@ impl WebRTCManager { .with_interceptor_registry(registry) .build(); + // If account is not initialized, consider creating a new one. + ACCOUNT.get_or_init(|| Mutex::new(Account::new())); + let peer_connection = Arc::new(api.new_peer_connection(config).await?); let webrtc = WebRTCManager { peer_connection, @@ -58,7 +61,6 @@ impl WebRTCManager { offer: String::default(), is_initiator: false, channel: None, - account: Arc::new(Mutex::new(Account::new())), }; let ice = Arc::downgrade(&webrtc.ice); @@ -75,10 +77,13 @@ impl WebRTCManager { ); // If Mutex is poisoned, it would be a non-sense. - // Leave the unusable connection as it is. - // Connection can't be properly closed here. if let Ok(mut ice_candidates) = ice.lock() { ice_candidates.push(candidate); + } else { + tracing::error!( + ?candidate, + "mutex was poisoned, aborting candidate" + ); } }, None => { @@ -168,39 +173,7 @@ impl WebRTCManager { ..self.clone() // uses Arc::clone(&T). }; channel.on_open(Box::new(move || { - Box::pin(async move { - use crate::models::Event::DHKey; - - // Initate XDH3 key creation from the other side. - if acc.is_initiator { - return; - } - - // Generate public key. - acc.account.lock().unwrap().generate_one_time_keys(1); - let public_key = acc.account.lock().unwrap().curve25519_key().to_vec(); - let otk = acc - .account - .lock() - .unwrap() - .one_time_keys(); - let otk = otk.values() - .next() - .ok_or(error::Error::AuthenticationFailed) - .unwrap().to_vec(); - - acc - .account - .lock() - .unwrap().mark_keys_as_published(); - - // Send to peer second encryption layer. - acc.channel - .unwrap() - .send_text(serde_json::to_string(&DHKey(X3DH {public_key, otk})).unwrap()) - .await - .unwrap(); - }) + Box::pin(async move { crate::triple_diffie_hellman!(acc) }) })); self.channel = Some(channel); @@ -224,11 +197,8 @@ impl fmt::Debug for WebRTCManager { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WebRTCManager") .field("peer_connection", &self.peer_connection) - .field( - "channel", - &self.channel.as_ref().map(|_| ""), - ) .field("offer", &self.offer) + .field("is_initiator", &self.is_initiator) .finish() } } diff --git a/p2p/src/x3dh.rs b/p2p/src/x3dh.rs index 94693f9..74923fa 100644 --- a/p2p/src/x3dh.rs +++ b/p2p/src/x3dh.rs @@ -1,8 +1,76 @@ +pub const MAX_ATTEMPTS: u8 = 4; + /// Spawn triple diffie hellman generation logic over WebRTC. #[macro_export] macro_rules! triple_diffie_hellman { - ($channel:expr) => {{ - use std::sync::Arc; - let + ($acc:expr) => {{ + use $crate::models::Event::DHKey; + use $crate::ACCOUNT; + use $crate::x3dh::MAX_ATTEMPTS; + use tracing::{error, debug}; + use tokio::time::{sleep, Duration}; + + if $acc.is_initiator { + return; + } + + let account = + ACCOUNT.get_or_init(|| Mutex::new(Account::new())); + + // Generate public key and one-time key. + let (public_key, otk) = { + let mut account = match account.lock() { + Ok(a) => a, + Err(_) => { + error!("account mutex is poisoned"); + return; + }, + }; + + account.generate_one_time_keys(1); + + let public_key = account.curve25519_key().to_vec(); + let otk = match account.one_time_keys().values().next() { + Some(k) => k.to_vec(), + None => { + // Since insertion occurs before, this should never happen here. + // If this is the case, it is best to restart. Let it crash. Poison Mutex. + unreachable!() + }, + }; + + (public_key, otk) + }; + + let message = match serde_json::to_string(&DHKey(X3DH { + public_key, + otk, + })) { + Ok(message) => message, + Err(err) => { + error!(%err, "serialization failed"); + return; + }, + }; + + if let Some(ch) = $acc.channel.as_ref() { + for n in 0..MAX_ATTEMPTS { + if n > 0 { + // Wait only if first try is failed. + sleep(Duration::from_secs(u64::from(n) * 5)).await; + } + + match ch.send_text(&message).await { + Ok(_) => { + if let Ok(mut acc_lock) = account.lock() { + acc_lock.mark_keys_as_published(); + debug!("public key and one-time key published") + } + break; + }, + Err(err) => error!(%err, "{n}th attempt to send keys to a peer failed"), + } + } + } }}; } From 147d6f0f26122491310c5215f07e4ddc14f46bb2 Mon Sep 17 00:00:00 2001 From: RealHinome Date: Mon, 27 Oct 2025 11:17:07 +0100 Subject: [PATCH 3/6] Improve X3DH handler --- libturms/src/channel.rs | 52 ++++++++++++----- libturms/src/lib.rs | 8 +-- p2p/src/lib.rs | 2 + p2p/src/webrtc.rs | 13 +---- p2p/src/x3dh.rs | 122 ++++++++++++++++++++-------------------- 5 files changed, 106 insertions(+), 91 deletions(-) diff --git a/libturms/src/channel.rs b/libturms/src/channel.rs index 27324c2..197ee84 100644 --- a/libturms/src/channel.rs +++ b/libturms/src/channel.rs @@ -1,24 +1,46 @@ //! Handle p2p (webrtc) data channel. -use webrtc::data_channel::RTCDataChannel; +use p2p::triple_diffie_hellman; +use p2p::webrtc::WebRTCManager; use webrtc::data_channel::data_channel_message::DataChannelMessage; -use std::sync::Arc; - -use crate::Turms; +#[derive(Clone, Debug)] +struct Handler { + webrtc: WebRTCManager, + label: String, +} /// Handle channel by parsing income messages. -pub fn handle_channel(_turms: &mut Turms, channel: Arc) { - let label = channel.label().to_owned(); - let d_label = label.clone(); +pub fn handle_channel(webrtc: WebRTCManager) { + let Some(channel) = webrtc.channel.clone() else { + tracing::error!("no WebRTC channel"); + return; + }; + + let handler = Handler { + webrtc, + label: channel.label().to_owned(), + }; - channel.on_open(Box::new(move || { - tracing::info!(%label, "new channel opened"); - Box::pin(async {}) - })); + { + let h = handler.clone(); + channel.on_open(Box::new(move || { + let label = &h.label; + tracing::info!(%label, "new channel opened"); + Box::pin(async move { + if let Err(err) = triple_diffie_hellman(&h.webrtc).await { + tracing::error!(%err, "X3DH failed"); + }; + }) + })); + } - channel.on_message(Box::new(move |msg: DataChannelMessage| { - tracing::trace!(%d_label, ?msg, "message received"); - Box::pin(async {}) - })); + { + let h = handler.clone(); + channel.on_message(Box::new(move |msg: DataChannelMessage| { + let label = &h.label; + tracing::trace!(%label, ?msg, "message received"); + Box::pin(async {}) + })); + } } diff --git a/libturms/src/lib.rs b/libturms/src/lib.rs index 2713dea..595fac1 100644 --- a/libturms/src/lib.rs +++ b/libturms/src/lib.rs @@ -96,8 +96,8 @@ impl Turms { pub async fn create_peer_offer(&mut self) -> Result { let mut webrtc = WebRTCManager::init().await?; - let channel = webrtc.create_channel().await?; - channel::handle_channel(self, channel); + let _channel = webrtc.create_channel().await?; + channel::handle_channel(webrtc.clone()); let offer = webrtc.create_offer().await?; // use offer-answer common datas later. @@ -127,8 +127,8 @@ impl Turms { pub async fn answer_to_peer(&mut self, offer: String) -> Result { let mut webrtc = WebRTCManager::init().await?; - let channel = webrtc.create_channel().await?; - channel::handle_channel(self, channel); + let _channel = webrtc.create_channel().await?; + channel::handle_channel(webrtc.clone()); let offer = webrtc.connect(offer).await?; diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 4e91351..0ce7695 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -8,6 +8,8 @@ pub mod webrtc; /// X3DH over WebRTC for Turms. mod x3dh; +pub use x3dh::triple_diffie_hellman; + use std::sync::{Mutex, OnceLock}; static ACCOUNT: OnceLock> = OnceLock::new(); diff --git a/p2p/src/webrtc.rs b/p2p/src/webrtc.rs index 0853890..bf6a056 100644 --- a/p2p/src/webrtc.rs +++ b/p2p/src/webrtc.rs @@ -16,7 +16,6 @@ use std::fmt; use std::sync::{Arc, Mutex}; use crate::ACCOUNT; -use crate::models::X3DH; /// Simple WebRTC connection manager. #[derive(Clone)] @@ -27,7 +26,8 @@ pub struct WebRTCManager { pub ice: Arc>>, /// Data channel. pub channel: Option>, - is_initiator: bool, + /// Know if user is offerer. + pub is_initiator: bool, offer: String, } @@ -167,15 +167,6 @@ impl WebRTCManager { .create_data_channel("data", Some(dc_init)) .await?; - let acc = Self { - channel: Some(Arc::clone(&channel)), - offer: String::default(), - ..self.clone() // uses Arc::clone(&T). - }; - channel.on_open(Box::new(move || { - Box::pin(async move { crate::triple_diffie_hellman!(acc) }) - })); - self.channel = Some(channel); Ok(self diff --git a/p2p/src/x3dh.rs b/p2p/src/x3dh.rs index 74923fa..5be429e 100644 --- a/p2p/src/x3dh.rs +++ b/p2p/src/x3dh.rs @@ -1,76 +1,76 @@ -pub const MAX_ATTEMPTS: u8 = 4; +use error::{Error, Result}; +use tokio::time::{Duration, sleep}; +use tracing::{debug, error}; +use vodozemac::olm::Account; -/// Spawn triple diffie hellman generation logic over WebRTC. -#[macro_export] -macro_rules! triple_diffie_hellman { - ($acc:expr) => {{ - use $crate::models::Event::DHKey; - use $crate::ACCOUNT; - use $crate::x3dh::MAX_ATTEMPTS; - use tracing::{error, debug}; - use tokio::time::{sleep, Duration}; - - if $acc.is_initiator { - return; - } +use crate::ACCOUNT; +use crate::models::Event::DHKey; +use crate::models::X3DH; - let account = - ACCOUNT.get_or_init(|| Mutex::new(Account::new())); +use std::sync::Mutex; - // Generate public key and one-time key. - let (public_key, otk) = { - let mut account = match account.lock() { - Ok(a) => a, - Err(_) => { - error!("account mutex is poisoned"); - return; - }, - }; +const MAX_ATTEMPTS: u8 = 4; - account.generate_one_time_keys(1); +/// Spawn triple diffie hellman generation logic over WebRTC. +pub async fn triple_diffie_hellman( + acc: &crate::webrtc::WebRTCManager, +) -> Result<()> { + if acc.is_initiator { + return Ok(()); + } - let public_key = account.curve25519_key().to_vec(); - let otk = match account.one_time_keys().values().next() { - Some(k) => k.to_vec(), - None => { - // Since insertion occurs before, this should never happen here. - // If this is the case, it is best to restart. Let it crash. Poison Mutex. - unreachable!() - }, - }; + let account = ACCOUNT.get_or_init(|| Mutex::new(Account::new())); - (public_key, otk) + // Generate public key and one-time key. + let (public_key, otk) = { + let mut account = match account.lock() { + Ok(a) => a, + Err(_) => { + error!("account mutex is poisoned"); + return Err(Error::MutexPoisoned); + }, }; - let message = match serde_json::to_string(&DHKey(X3DH { - public_key, - otk, - })) { - Ok(message) => message, - Err(err) => { - error!(%err, "serialization failed"); - return; + account.generate_one_time_keys(1); + + let public_key = account.curve25519_key().to_vec(); + let otk = match account.one_time_keys().values().next() { + Some(k) => k.to_vec(), + None => { + // Since insertion occurs before, this should never happen here. + return Err(Error::MutexPoisoned); }, }; - if let Some(ch) = $acc.channel.as_ref() { - for n in 0..MAX_ATTEMPTS { - if n > 0 { - // Wait only if first try is failed. - sleep(Duration::from_secs(u64::from(n) * 5)).await; - } + (public_key, otk) + }; + + let message = serde_json::to_string(&DHKey(X3DH { public_key, otk }))?; - match ch.send_text(&message).await { - Ok(_) => { - if let Ok(mut acc_lock) = account.lock() { - acc_lock.mark_keys_as_published(); - debug!("public key and one-time key published") - } - break; - }, - Err(err) => error!(%err, "{n}th attempt to send keys to a peer failed"), - } + if let Some(ch) = acc.channel.as_ref() { + for n in 0..MAX_ATTEMPTS { + if n > 0 { + // Wait only if first try is failed. + sleep(Duration::from_secs(u64::from(n) * 5)).await; + } + + match ch.send_text(&message).await { + Ok(_) => { + if let Ok(mut acc_lock) = account.lock() { + acc_lock.mark_keys_as_published(); + debug!("public key and one-time key published"); + } + break; + }, + Err(err) => { + error!(%err, "{n}th attempt to send keys to a peer failed"); + if n == MAX_ATTEMPTS - 1 { + return Err(Error::MessageSendFailed); + } + }, } } - }}; + } + + Ok(()) } From 6b1b908803260bc9fbe364880515cd9c85911750 Mon Sep 17 00:00:00 2001 From: RealHinome Date: Mon, 27 Oct 2025 11:18:39 +0100 Subject: [PATCH 4/6] Fix discovery server connection --- discover/src/websocket.rs | 47 ++++----------------------------------- libturms/src/lib.rs | 8 ++----- 2 files changed, 6 insertions(+), 49 deletions(-) diff --git a/discover/src/websocket.rs b/discover/src/websocket.rs index 44c8db2..c17daab 100644 --- a/discover/src/websocket.rs +++ b/discover/src/websocket.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use crate::models::phoenix::Message as PhxMessage; -use crate::models::response::{Response, Status}; type _Sender = SplitSink>, Message>; @@ -27,7 +26,6 @@ type Reader = const DEFAULT_QUEUED_MESSAGE: usize = 32; const SOCKET_PATH: &str = "/socket/websocket"; -const AUTH_PATH: &str = "/api/auth"; /// WebSocket client manager. #[allow(dead_code)] @@ -73,12 +71,6 @@ pub struct WebSocket { max_queued_message: usize, } -#[derive(Debug, Serialize)] -struct Auth { - vanity: String, - password: Option, -} - impl WebSocket { /// Create a new [`WebSocket`] without connecting it. pub fn new>(url: T) -> Result { @@ -96,22 +88,10 @@ impl WebSocket { }) } - fn get_scheme(&self, base: &str) -> String { - match self.url.scheme() { - "https" | "wss" => format!("{base}s"), - _ => base.to_owned(), - } - } - /// Establish the WebSocket connection. /// - /// First, it makes an HTTP request to get the JWT. - /// Then, it connects to WebSocket using the token. - pub async fn connect( - &mut self, - identifier: T, - password: Option, - ) -> Result<()> { + /// Uses pre-generated JWT by Turms. + pub async fn connect(&mut self, token: String) -> Result<()> { // Ensure the URL has a valid host. let host = { let host_str = self @@ -125,29 +105,10 @@ impl WebSocket { } }; - let scheme = self.get_scheme("http"); - let url = format!("{scheme}://{host}{AUTH_PATH}"); - - // Send request and get token. - let token = reqwest::Client::new() - .post(&url) - .json(&Auth { - vanity: identifier.to_string(), - password: password.map(|p| p.to_string()), - }) - .send() - .await? - .json::() - .await?; - - if token.status == Status::Error || token.data.is_empty() { - return Err(Error::AuthenticationFailed); - } - // Establish WebSocket connection. - let scheme = self.get_scheme("ws"); + let scheme = self.url.scheme(); let socket_url = - format!("{scheme}://{host}{SOCKET_PATH}?token={}", token.data); + format!("{scheme}://{host}{SOCKET_PATH}?token={token}"); let (mut socket, _response) = connect_async(&socket_url).await?; diff --git a/libturms/src/lib.rs b/libturms/src/lib.rs index 595fac1..361b239 100644 --- a/libturms/src/lib.rs +++ b/libturms/src/lib.rs @@ -69,13 +69,9 @@ impl Turms { } /// Init WebSocket connection and handle messages. - pub async fn connect( - mut self, - identifier: T, - password: Option, - ) -> Result { + pub async fn connect(mut self, token: T) -> Result { if let Some(ref mut turms) = self.turms { - turms.connect(identifier, password).await?; + turms.connect(token.to_string()).await?; let ws = turms.reader.clone().ok_or(error::ConnectionClosed)?; tokio::spawn(async move { From c5e5381c4b62ed536968a7e8ae1a2e84e519694f Mon Sep 17 00:00:00 2001 From: RealHinome Date: Mon, 27 Oct 2025 11:39:23 +0100 Subject: [PATCH 5/6] Fix examples --- libturms/examples/libturms/answer.rs | 4 ++-- libturms/examples/libturms/offer.rs | 4 ++-- libturms/examples/tokio.rs | 11 ++++++++++- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/libturms/examples/libturms/answer.rs b/libturms/examples/libturms/answer.rs index 3ffdc11..6a21db4 100644 --- a/libturms/examples/libturms/answer.rs +++ b/libturms/examples/libturms/answer.rs @@ -33,9 +33,9 @@ async fn main() { let mut managed_turms = Turms::from_config(libturms::ConfigFinder::::Text(config)) .unwrap() - .connect("user2", None) + /*.connect("user2", None) .await - .unwrap(); + .unwrap()*/; println!("Enter peer offer: "); diff --git a/libturms/examples/libturms/offer.rs b/libturms/examples/libturms/offer.rs index bea7db8..eb44370 100644 --- a/libturms/examples/libturms/offer.rs +++ b/libturms/examples/libturms/offer.rs @@ -33,9 +33,9 @@ async fn main() { let mut managed_turms = Turms::from_config(libturms::ConfigFinder::::Text(config)) .unwrap() - .connect("user", None) + /*.connect("user2", None) .await - .unwrap(); + .unwrap()*/; let offer = managed_turms.create_peer_offer().await.unwrap(); println!("My offer is: {offer}"); diff --git a/libturms/examples/tokio.rs b/libturms/examples/tokio.rs index d00fffa..3abc305 100644 --- a/libturms/examples/tokio.rs +++ b/libturms/examples/tokio.rs @@ -5,6 +5,8 @@ use libturms::p2p::models::*; use libturms::p2p::webrtc; use tracing_subscriber::prelude::*; +use std::io::{self, BufRead}; + const LOCAL_URL: &str = "http://localhost:4000"; #[tokio::main] @@ -20,9 +22,16 @@ async fn main() { .with(tracing_subscriber::fmt::layer()) .init(); + println!("Enter Turms token: "); + + let mut buffer = String::new(); + let stdin = io::stdin(); + let mut handle = stdin.lock(); + handle.read_line(&mut buffer).unwrap(); + let mut ws = websocket::WebSocket::new(LOCAL_URL).expect("URL is invalid."); - ws.connect("user", None) + ws.connect(buffer) .await .expect("Is the password wrong? Or server offline?"); From eb775d49d15da321a2165b21e251d8de0dfdaf16 Mon Sep 17 00:00:00 2001 From: RealHinome Date: Wed, 29 Oct 2025 20:12:37 +0100 Subject: [PATCH 6/6] Support X3DH --- Cargo.lock | 2 ++ libturms/Cargo.toml | 3 +- libturms/src/channel.rs | 62 ++++++++++++++++++++++++++++++-- p2p/Cargo.toml | 1 + p2p/src/lib.rs | 29 ++++++++------- p2p/src/models.rs | 10 ++++-- p2p/src/webrtc.rs | 78 +++++++++++++++++++++++++++++++---------- p2p/src/x3dh.rs | 58 ++++++++---------------------- 8 files changed, 159 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6dfd8d9..c72efda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1272,6 +1272,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "vodozemac", "webrtc", ] @@ -1595,6 +1596,7 @@ name = "p2p" version = "0.1.0" dependencies = [ "bitflags 2.9.1", + "bytes", "chrono", "error", "serde", diff --git a/libturms/Cargo.toml b/libturms/Cargo.toml index f4b9004..69e037d 100644 --- a/libturms/Cargo.toml +++ b/libturms/Cargo.toml @@ -22,8 +22,10 @@ bench = false discover = { path = "../discover" } p2p = { path = "../p2p" } error = { path = "../error" } +vodozemac = "0.9" serde = { workspace = true } serde_yaml = "0.9" +serde_json = "1" webrtc = { workspace = true } tokio = { workspace = true, features = ["full"] } futures-util = "0.3" @@ -32,7 +34,6 @@ tracing = "0.1" [dev-dependencies] anyhow = "*" serde = "*" -serde_json = "1" tokio = { version = "1", features = ["full"] } futures-util = "0.3" tracing = "0.1" diff --git a/libturms/src/channel.rs b/libturms/src/channel.rs index 197ee84..24ccd78 100644 --- a/libturms/src/channel.rs +++ b/libturms/src/channel.rs @@ -1,9 +1,14 @@ //! Handle p2p (webrtc) data channel. -use p2p::triple_diffie_hellman; +use p2p::models::{Event, X3DH}; use p2p::webrtc::WebRTCManager; +use p2p::{get_account, triple_diffie_hellman}; +use tokio::sync::Mutex; +use vodozemac::olm::{OlmMessage, SessionConfig}; use webrtc::data_channel::data_channel_message::DataChannelMessage; +use std::sync::Arc; + #[derive(Clone, Debug)] struct Handler { webrtc: WebRTCManager, @@ -39,8 +44,59 @@ pub fn handle_channel(webrtc: WebRTCManager) { let h = handler.clone(); channel.on_message(Box::new(move |msg: DataChannelMessage| { let label = &h.label; - tracing::trace!(%label, ?msg, "message received"); - Box::pin(async {}) + tracing::trace!(%label, ?msg, "webrtc message received"); + + let mut webrtc = h.webrtc.clone(); + Box::pin(async move { + let data = match webrtc.session { + Some(session) => { + let message = match vodozemac::olm::Message::from_bytes(&msg.data) { + Ok(msg) => msg, + Err(_) => { + return; + } + }; + + session.lock().await.decrypt(&OlmMessage::from(message)).unwrap_or_else(|_| msg.data.to_vec()) + } + None => msg.data.to_vec(), + }; + + let Ok(json) = serde_json::from_slice(&data) else { + tracing::debug!("decoding failed"); + return; + }; + tracing::debug!(?json, "decoded webrtc message"); + + match json { + Event::DHKey(x3dh) => { + let mut account = get_account().lock().await; + let public_key = account.curve25519_key(); + if let Some(otk) = x3dh.otk { + let mut session: vodozemac::olm::Session = account.create_outbound_session(SessionConfig::version_2(), x3dh.public_key, otk); + let message = session.encrypt(""); + webrtc.session = Some(Arc::new(Mutex::new(session))); + if let OlmMessage::PreKey(pk) = message { + match serde_json::to_string(&Event::DHKey(X3DH { public_key, otk: None, prekey: Some(pk) })) { + Ok(message) => { + if let Err(err) = webrtc.send(message).await { + tracing::error!(%err, "failed to send message"); + } + } + Err(err) => tracing::error!(%err, "failed to serialize DHKey event"), + } + } + } else if let Some(prekey) = x3dh.prekey { + if let Err(err) = account.create_inbound_session(x3dh.public_key, &prekey) { + tracing::error!(%err, "failed to create inbound session"); + } + } else { + tracing::error!("received X3DH request without otk nor pre-key"); + } + }, + _ => unimplemented!(), + } + }) })); } } diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 9e4f58c..9fe7d9b 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -13,3 +13,4 @@ bitflags = { version = "2", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] } error = { path = "../error" } vodozemac = "0.9" +bytes = "1.10" diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 0ce7695..ecd9caf 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -10,30 +10,29 @@ mod x3dh; pub use x3dh::triple_diffie_hellman; -use std::sync::{Mutex, OnceLock}; +use tokio::sync::Mutex; +use vodozemac::olm::Account; -static ACCOUNT: OnceLock> = OnceLock::new(); +use std::sync::OnceLock; + +static ACCOUNT: OnceLock> = OnceLock::new(); + +pub fn get_account() -> &'static Mutex { + ACCOUNT.get_or_init(|| Mutex::new(Account::new())) +} /// Get user account. -pub fn save_account() -> error::Result { - let account = - ACCOUNT.get_or_init(|| Mutex::new(vodozemac::olm::Account::new())); - - Ok(serde_json::to_string( - &account - .lock() - .map_err(|_| error::Error::MutexPoisoned)? - .pickle(), - )?) +pub async fn save_account() -> error::Result { + let account = get_account(); + + Ok(serde_json::to_string(&account.lock().await.pickle())?) } /// Set user account. pub fn restore_account(json: &str) -> Result<(), serde_json::Error> { let pickle: vodozemac::olm::AccountPickle = serde_json::from_str(json)?; - let _ = ACCOUNT.get_or_init(|| { - Mutex::new(vodozemac::olm::Account::from_pickle(pickle)) - }); + let _ = ACCOUNT.get_or_init(|| Mutex::new(Account::from_pickle(pickle))); Ok(()) } diff --git a/p2p/src/models.rs b/p2p/src/models.rs index b2c24a0..7cfb5ed 100644 --- a/p2p/src/models.rs +++ b/p2p/src/models.rs @@ -1,6 +1,8 @@ use bitflags::bitflags; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use vodozemac::Curve25519PublicKey; +use vodozemac::olm::PreKeyMessage; /// Encapsulates events. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -13,9 +15,11 @@ pub enum Event { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct X3DH { /// Curve25519 public key. - pub public_key: Vec, - /// One-time-key. - pub otk: Vec, + pub public_key: Curve25519PublicKey, + /// One-time key. + pub otk: Option, + /// Receiver pre-key. + pub prekey: Option, } /// Represents a message in a chat. diff --git a/p2p/src/webrtc.rs b/p2p/src/webrtc.rs index bf6a056..a0e6531 100644 --- a/p2p/src/webrtc.rs +++ b/p2p/src/webrtc.rs @@ -1,5 +1,6 @@ use error::Result; -use vodozemac::olm::Account; +use tokio::sync::Mutex as AsyncMutex; +use tokio::time::{Duration, sleep}; use webrtc::api::APIBuilder; use webrtc::api::interceptor_registry::register_default_interceptors; use webrtc::api::media_engine::MediaEngine; @@ -15,7 +16,7 @@ use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; use std::fmt; use std::sync::{Arc, Mutex}; -use crate::ACCOUNT; +const MAX_ATTEMPTS: u8 = 4; /// Simple WebRTC connection manager. #[derive(Clone)] @@ -26,6 +27,8 @@ pub struct WebRTCManager { pub ice: Arc>>, /// Data channel. pub channel: Option>, + /// Cryptographic session. + pub session: Option>>, /// Know if user is offerer. pub is_initiator: bool, offer: String, @@ -52,7 +55,7 @@ impl WebRTCManager { .build(); // If account is not initialized, consider creating a new one. - ACCOUNT.get_or_init(|| Mutex::new(Account::new())); + crate::get_account(); let peer_connection = Arc::new(api.new_peer_connection(config).await?); let webrtc = WebRTCManager { @@ -61,6 +64,7 @@ impl WebRTCManager { offer: String::default(), is_initiator: false, channel: None, + session: None, }; let ice = Arc::downgrade(&webrtc.ice); @@ -110,11 +114,13 @@ impl WebRTCManager { .await .recv() .await; - let offer = - self.peer_connection - .local_description() - .await - .ok_or(error::Error::WebRTC(webrtc::error::Error::ErrPeerConnSDPTypeInvalidValueSetLocalDescription))?; + let offer = self + .peer_connection + .local_description() + .await + .ok_or(error::Error::WebRTC( + webrtc::error::Error::ErrPeerConnSDPTypeInvalidValueSetLocalDescription, + ))?; self.offer = serde_json::to_string(&offer)?; self.is_initiator = true; @@ -134,11 +140,13 @@ impl WebRTCManager { .await .recv() .await; - let answer = - self.peer_connection - .local_description() - .await - .ok_or(error::Error::WebRTC(webrtc::error::Error::ErrPeerConnSDPTypeInvalidValueSetLocalDescription))?; + let answer = self + .peer_connection + .local_description() + .await + .ok_or(error::Error::WebRTC( + webrtc::error::Error::ErrPeerConnSDPTypeInvalidValueSetLocalDescription, + ))?; self.offer = serde_json::to_string(&answer)?; @@ -167,12 +175,9 @@ impl WebRTCManager { .create_data_channel("data", Some(dc_init)) .await?; - self.channel = Some(channel); + self.channel = Some(Arc::clone(&channel)); - Ok(self - .channel - .clone() - .ok_or(webrtc::Error::ErrConnectionClosed)?) + Ok(channel) } /// Convert a [`String`] to [`RTCSessionDescription`]. @@ -182,6 +187,43 @@ impl WebRTCManager { ) -> Result { Ok(serde_json::from_str(session)?) } + + /// Sender with retries. + /// Useful during X3DH negociation. + pub async fn send(&self, message: String) -> Result<()> { + let msg = match self.session.clone() { + Some(session) => { + session.lock().await.encrypt(message).message().to_vec() + }, + None => message.as_bytes().to_vec(), + }; + + match self.channel.as_ref() { + Some(ch) => { + for n in 0..MAX_ATTEMPTS { + if n > 0 { + // Wait only if first try is failed. + sleep(Duration::from_secs(u64::from(n) * 5)).await; + } + + match ch.send(&bytes::Bytes::from(msg.clone())).await { + Ok(_) => break, + Err(err) => { + tracing::error!(%err, "{n}th attempt to send message failed"); + if n == MAX_ATTEMPTS - 1 { + return Err(error::Error::MessageSendFailed); + } + }, + } + } + + Ok(()) + }, + None => { + Err(error::Error::WebRTC(webrtc::Error::ErrDataChannelNotOpen)) + }, + } + } } impl fmt::Debug for WebRTCManager { diff --git a/p2p/src/x3dh.rs b/p2p/src/x3dh.rs index 5be429e..cd3baac 100644 --- a/p2p/src/x3dh.rs +++ b/p2p/src/x3dh.rs @@ -1,16 +1,8 @@ use error::{Error, Result}; -use tokio::time::{Duration, sleep}; -use tracing::{debug, error}; -use vodozemac::olm::Account; -use crate::ACCOUNT; use crate::models::Event::DHKey; use crate::models::X3DH; -use std::sync::Mutex; - -const MAX_ATTEMPTS: u8 = 4; - /// Spawn triple diffie hellman generation logic over WebRTC. pub async fn triple_diffie_hellman( acc: &crate::webrtc::WebRTCManager, @@ -19,58 +11,36 @@ pub async fn triple_diffie_hellman( return Ok(()); } - let account = ACCOUNT.get_or_init(|| Mutex::new(Account::new())); + let account = crate::get_account(); // Generate public key and one-time key. let (public_key, otk) = { - let mut account = match account.lock() { - Ok(a) => a, - Err(_) => { - error!("account mutex is poisoned"); - return Err(Error::MutexPoisoned); - }, - }; + let mut account = account.lock().await; account.generate_one_time_keys(1); - let public_key = account.curve25519_key().to_vec(); + let public_key = account.curve25519_key(); let otk = match account.one_time_keys().values().next() { - Some(k) => k.to_vec(), + Some(k) => *k, None => { // Since insertion occurs before, this should never happen here. return Err(Error::MutexPoisoned); }, }; - (public_key, otk) + (public_key, Some(otk)) }; - let message = serde_json::to_string(&DHKey(X3DH { public_key, otk }))?; - - if let Some(ch) = acc.channel.as_ref() { - for n in 0..MAX_ATTEMPTS { - if n > 0 { - // Wait only if first try is failed. - sleep(Duration::from_secs(u64::from(n) * 5)).await; - } + let message = serde_json::to_string(&DHKey(X3DH { + public_key, + otk, + prekey: None, + }))?; - match ch.send_text(&message).await { - Ok(_) => { - if let Ok(mut acc_lock) = account.lock() { - acc_lock.mark_keys_as_published(); - debug!("public key and one-time key published"); - } - break; - }, - Err(err) => { - error!(%err, "{n}th attempt to send keys to a peer failed"); - if n == MAX_ATTEMPTS - 1 { - return Err(Error::MessageSendFailed); - } - }, - } - } - } + if acc.send(message).await.is_ok() { + account.lock().await.mark_keys_as_published(); + tracing::debug!("public key and one-time key published"); + }; Ok(()) }