Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 4 additions & 43 deletions discover/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

use crate::models::phoenix::Message as PhxMessage;
use crate::models::response::{Response, Status};

type _Sender =
SplitSink<TungsteniteWebSocket<MaybeTlsStream<TcpStream>>, Message>;
Expand All @@ -27,7 +26,6 @@ type Reader =

const DEFAULT_QUEUED_MESSAGE: usize = 32;
const SOCKET_PATH: &str = "/socket/websocket";
const AUTH_PATH: &str = "/api/auth";

/// WebSocket client manager.
#[allow(dead_code)]
Expand Down Expand Up @@ -73,12 +71,6 @@ pub struct WebSocket {
max_queued_message: usize,
}

#[derive(Debug, Serialize)]
struct Auth {
vanity: String,
password: Option<String>,
}

impl WebSocket {
/// Create a new [`WebSocket`] without connecting it.
pub fn new<T: AsRef<str>>(url: T) -> Result<Self> {
Expand All @@ -96,22 +88,10 @@ impl WebSocket {
})
}

fn get_scheme(&self, base: &str) -> String {
match self.url.scheme() {
"https" | "wss" => format!("{base}s"),
_ => base.to_owned(),
}
}

/// Establish the WebSocket connection.
///
/// First, it makes an HTTP request to get the JWT.
/// Then, it connects to WebSocket using the token.
pub async fn connect<T: ToString>(
&mut self,
identifier: T,
password: Option<T>,
) -> Result<()> {
/// Uses pre-generated JWT by Turms.
pub async fn connect(&mut self, token: String) -> Result<()> {
// Ensure the URL has a valid host.
let host = {
let host_str = self
Expand All @@ -125,29 +105,10 @@ impl WebSocket {
}
};

let scheme = self.get_scheme("http");
let url = format!("{scheme}://{host}{AUTH_PATH}");

// Send request and get token.
let token = reqwest::Client::new()
.post(&url)
.json(&Auth {
vanity: identifier.to_string(),
password: password.map(|p| p.to_string()),
})
.send()
.await?
.json::<Response>()
.await?;

if token.status == Status::Error || token.data.is_empty() {
return Err(Error::AuthenticationFailed);
}

// Establish WebSocket connection.
let scheme = self.get_scheme("ws");
let scheme = self.url.scheme();
let socket_url =
format!("{scheme}://{host}{SOCKET_PATH}?token={}", token.data);
format!("{scheme}://{host}{SOCKET_PATH}?token={token}");

let (mut socket, _response) = connect_async(&socket_url).await?;

Expand Down
3 changes: 2 additions & 1 deletion libturms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ bench = false
discover = { path = "../discover" }
p2p = { path = "../p2p" }
error = { path = "../error" }
vodozemac = "0.9"
serde = { workspace = true }
serde_yaml = "0.9"
serde_json = "1"
webrtc = { workspace = true }
tokio = { workspace = true, features = ["full"] }
futures-util = "0.3"
Expand All @@ -32,7 +34,6 @@ tracing = "0.1"
[dev-dependencies]
anyhow = "*"
serde = "*"
serde_json = "1"
tokio = { version = "1", features = ["full"] }
futures-util = "0.3"
tracing = "0.1"
Expand Down
4 changes: 2 additions & 2 deletions libturms/examples/libturms/answer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ async fn main() {
let mut managed_turms =
Turms::from_config(libturms::ConfigFinder::<String>::Text(config))
.unwrap()
.connect("user2", None)
/*.connect("user2", None)
.await
.unwrap();
.unwrap()*/;

println!("Enter peer offer: ");

Expand Down
4 changes: 2 additions & 2 deletions libturms/examples/libturms/offer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ async fn main() {
let mut managed_turms =
Turms::from_config(libturms::ConfigFinder::<String>::Text(config))
.unwrap()
.connect("user", None)
/*.connect("user2", None)
.await
.unwrap();
.unwrap()*/;

let offer = managed_turms.create_peer_offer().await.unwrap();
println!("My offer is: {offer}");
Expand Down
11 changes: 10 additions & 1 deletion libturms/examples/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use libturms::p2p::models::*;
use libturms::p2p::webrtc;
use tracing_subscriber::prelude::*;

use std::io::{self, BufRead};

const LOCAL_URL: &str = "http://localhost:4000";

#[tokio::main]
Expand All @@ -20,9 +22,16 @@ async fn main() {
.with(tracing_subscriber::fmt::layer())
.init();

println!("Enter Turms token: ");

let mut buffer = String::new();
let stdin = io::stdin();
let mut handle = stdin.lock();
handle.read_line(&mut buffer).unwrap();

let mut ws = websocket::WebSocket::new(LOCAL_URL).expect("URL is invalid.");

ws.connect("user", None)
ws.connect(buffer)
.await
.expect("Is the password wrong? Or server offline?");

Expand Down
108 changes: 93 additions & 15 deletions libturms/src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,102 @@
//! Handle p2p (webrtc) data channel.

use webrtc::data_channel::RTCDataChannel;
use p2p::models::{Event, X3DH};
use p2p::webrtc::WebRTCManager;
use p2p::{get_account, triple_diffie_hellman};
use tokio::sync::Mutex;
use vodozemac::olm::{OlmMessage, SessionConfig};
use webrtc::data_channel::data_channel_message::DataChannelMessage;

use std::sync::Arc;

use crate::Turms;
#[derive(Clone, Debug)]
struct Handler {
webrtc: WebRTCManager,
label: String,
}

/// Handle channel by parsing income messages.
pub fn handle_channel(_turms: &mut Turms, channel: Arc<RTCDataChannel>) {
let label = channel.label().to_owned();
let d_label = label.clone();

channel.on_open(Box::new(move || {
tracing::info!(%label, "new channel opened");
Box::pin(async {})
}));

channel.on_message(Box::new(move |msg: DataChannelMessage| {
tracing::trace!(%d_label, ?msg, "message received");
Box::pin(async {})
}));
pub fn handle_channel(webrtc: WebRTCManager) {
let Some(channel) = webrtc.channel.clone() else {
tracing::error!("no WebRTC channel");
return;
};

let handler = Handler {
webrtc,
label: channel.label().to_owned(),
};

{
let h = handler.clone();
channel.on_open(Box::new(move || {
let label = &h.label;
tracing::info!(%label, "new channel opened");
Box::pin(async move {
if let Err(err) = triple_diffie_hellman(&h.webrtc).await {
tracing::error!(%err, "X3DH failed");
};
})
}));
}

{
let h = handler.clone();
channel.on_message(Box::new(move |msg: DataChannelMessage| {
let label = &h.label;
tracing::trace!(%label, ?msg, "webrtc message received");

let mut webrtc = h.webrtc.clone();
Box::pin(async move {
let data = match webrtc.session {
Some(session) => {
let message = match vodozemac::olm::Message::from_bytes(&msg.data) {
Ok(msg) => msg,
Err(_) => {
return;
}
};

session.lock().await.decrypt(&OlmMessage::from(message)).unwrap_or_else(|_| msg.data.to_vec())
}
None => msg.data.to_vec(),
};

let Ok(json) = serde_json::from_slice(&data) else {
tracing::debug!("decoding failed");
return;
};
tracing::debug!(?json, "decoded webrtc message");

match json {
Event::DHKey(x3dh) => {
let mut account = get_account().lock().await;
let public_key = account.curve25519_key();
if let Some(otk) = x3dh.otk {
let mut session: vodozemac::olm::Session = account.create_outbound_session(SessionConfig::version_2(), x3dh.public_key, otk);
let message = session.encrypt("");
webrtc.session = Some(Arc::new(Mutex::new(session)));
if let OlmMessage::PreKey(pk) = message {
match serde_json::to_string(&Event::DHKey(X3DH { public_key, otk: None, prekey: Some(pk) })) {
Ok(message) => {
if let Err(err) = webrtc.send(message).await {
tracing::error!(%err, "failed to send message");
}
}
Err(err) => tracing::error!(%err, "failed to serialize DHKey event"),
}
}
} else if let Some(prekey) = x3dh.prekey {
if let Err(err) = account.create_inbound_session(x3dh.public_key, &prekey) {
tracing::error!(%err, "failed to create inbound session");
}
} else {
tracing::error!("received X3DH request without otk nor pre-key");
}
},
_ => unimplemented!(),
}
})
}));
}
}
16 changes: 6 additions & 10 deletions libturms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,9 @@ impl Turms {
}

/// Init WebSocket connection and handle messages.
pub async fn connect<T: ToString>(
mut self,
identifier: T,
password: Option<T>,
) -> Result<Self> {
pub async fn connect<T: ToString>(mut self, token: T) -> Result<Self> {
if let Some(ref mut turms) = self.turms {
turms.connect(identifier, password).await?;
turms.connect(token.to_string()).await?;

let ws = turms.reader.clone().ok_or(error::ConnectionClosed)?;
tokio::spawn(async move {
Expand All @@ -96,8 +92,8 @@ impl Turms {
pub async fn create_peer_offer(&mut self) -> Result<String> {
let mut webrtc = WebRTCManager::init().await?;

let channel = webrtc.create_channel().await?;
channel::handle_channel(self, channel);
let _channel = webrtc.create_channel().await?;
channel::handle_channel(webrtc.clone());

let offer = webrtc.create_offer().await?;
// use offer-answer common datas later.
Expand Down Expand Up @@ -127,8 +123,8 @@ impl Turms {
pub async fn answer_to_peer(&mut self, offer: String) -> Result<String> {
let mut webrtc = WebRTCManager::init().await?;

let channel = webrtc.create_channel().await?;
channel::handle_channel(self, channel);
let _channel = webrtc.create_channel().await?;
channel::handle_channel(webrtc.clone());

let offer = webrtc.connect(offer).await?;

Expand Down
1 change: 1 addition & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ bitflags = { version = "2", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }
error = { path = "../error" }
vodozemac = "0.9"
bytes = "1.10"
31 changes: 16 additions & 15 deletions p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,31 @@ pub mod webrtc;
/// X3DH over WebRTC for Turms.
mod x3dh;

use std::sync::{Mutex, OnceLock};
pub use x3dh::triple_diffie_hellman;

static ACCOUNT: OnceLock<Mutex<vodozemac::olm::Account>> = OnceLock::new();
use tokio::sync::Mutex;
use vodozemac::olm::Account;

use std::sync::OnceLock;

static ACCOUNT: OnceLock<Mutex<Account>> = OnceLock::new();

pub fn get_account() -> &'static Mutex<Account> {
ACCOUNT.get_or_init(|| Mutex::new(Account::new()))
}

/// Get user account.
pub fn save_account() -> error::Result<String> {
let account =
ACCOUNT.get_or_init(|| Mutex::new(vodozemac::olm::Account::new()));

Ok(serde_json::to_string(
&account
.lock()
.map_err(|_| error::Error::MutexPoisoned)?
.pickle(),
)?)
pub async fn save_account() -> error::Result<String> {
let account = get_account();

Ok(serde_json::to_string(&account.lock().await.pickle())?)
}

/// Set user account.
pub fn restore_account(json: &str) -> Result<(), serde_json::Error> {
let pickle: vodozemac::olm::AccountPickle = serde_json::from_str(json)?;

let _ = ACCOUNT.get_or_init(|| {
Mutex::new(vodozemac::olm::Account::from_pickle(pickle))
});
let _ = ACCOUNT.get_or_init(|| Mutex::new(Account::from_pickle(pickle)));

Ok(())
}
Loading