Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/hotfix/src/initiator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::config::SessionConfig;
use crate::message::FixMessage;
use crate::session::SessionRef;
use crate::store::MessageStore;
use crate::transport::FixConnection;
use crate::transport::build_connection;

pub struct Initiator<M> {
pub config: SessionConfig,
Expand Down Expand Up @@ -60,7 +60,7 @@ async fn establish_connection<M: FixMessage>(config: SessionConfig, session_ref:
}
session_ref.await_active_session_time().await;

match FixConnection::connect(&config, session_ref.clone()).await {
match build_connection(&config, session_ref.clone()).await {
Ok(conn) => {
session_ref.register_writer(conn.get_writer()).await;

Expand Down
2 changes: 1 addition & 1 deletion crates/hotfix/src/session/state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::message::parser::RawFixMessage;
use crate::session::event::AwaitingActiveSessionResponse;
use crate::transport::socket_writer::WriterRef;
use crate::transport::{actor::WriterModel, socket_writer::WriterRef};
use hotfix_message::message::Message;
use std::collections::VecDeque;
use tokio::sync::oneshot;
Expand Down
3 changes: 2 additions & 1 deletion crates/hotfix/src/transport.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
pub(crate) mod actor;
mod connection;
pub(crate) mod socket_reader;
pub(crate) mod socket_writer;
mod tcp;
mod tls;

pub use connection::FixConnection;
pub use connection::build_connection;
38 changes: 38 additions & 0 deletions crates/hotfix/src/transport/actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use std::fmt::Debug;

use async_trait::async_trait;
use tokio::{io::AsyncWriteExt, task::JoinHandle};
use tracing::debug;

use crate::message::parser::RawFixMessage;

#[async_trait]
pub trait Actor<M: Send> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is possibly a good abstraction, but I don't see us taking advantage of it anywhere in this PR currently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I wouldn't conflate this PR with the creation of a more generic actor framework. It requires a bit more consideration as not all actors fit this exact implementation in fn run.

fn run(mut self) -> JoinHandle<()>
where
Self: 'static + Sized + Send,
{
tokio::spawn(async move {
while let Some(msg) = self.next().await {
if !self.handle(msg).await {
break;
}
}
debug!("writer loop is shutting down");
})
}
async fn next(&mut self) -> Option<M>;
async fn handle(&mut self, message: M) -> bool;
}

#[async_trait]
pub trait WriterModel: Clone + Debug {
/// Create a new writer model and Spawn Actor Counterpart
fn new<W: AsyncWriteExt + Unpin + Send + 'static>(writer: W) -> Self;

/// Send a RawFix Message
async fn send_raw_message(&self, msg: RawFixMessage);

/// Disconnect from Actor
async fn disconnect(&self);
}
61 changes: 35 additions & 26 deletions crates/hotfix/src/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,19 @@ use tokio::io::{AsyncRead, AsyncWrite};
use crate::config::SessionConfig;
use crate::message::FixMessage;
use crate::session::SessionRef;
use crate::transport::actor::WriterModel;
use crate::transport::socket_reader::ReaderRef;
use crate::transport::socket_writer::WriterRef;
use crate::transport::tcp::create_tcp_connection;
use crate::transport::tls::create_tcp_over_tls_connection;

pub struct FixConnection {
_writer: WriterRef,
pub struct FixConnection<W: WriterModel> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this isn't on the right track - there is no need for the FixConnection to be generic. This relates to how I don't think the WriterRef needs to be further abstracted either.

_writer: W,
_reader: ReaderRef,
}

impl FixConnection {
pub async fn connect(
config: &SessionConfig,
session_ref: SessionRef<impl FixMessage>,
) -> io::Result<Self> {
let use_tls = config.tls_config.is_some();

let conn = if use_tls {
let stream = create_tcp_over_tls_connection(config).await?;
_create_io_refs(session_ref.clone(), stream).await
} else {
let stream = create_tcp_connection(config).await?;
_create_io_refs(session_ref.clone(), stream).await
};

Ok(conn)
}

pub fn get_writer(&self) -> WriterRef {
impl<W: WriterModel> FixConnection<W> {
pub fn get_writer(&self) -> W {
self._writer.clone()
}

Expand All @@ -41,7 +25,35 @@ impl FixConnection {
}
}

async fn _create_io_refs<M, Stream>(session_ref: SessionRef<M>, stream: Stream) -> FixConnection
impl From<(WriterRef, ReaderRef)> for FixConnection<WriterRef> {
fn from(refs: (WriterRef, ReaderRef)) -> Self {
let (_writer, _reader) = refs;
FixConnection { _writer, _reader }
}
}

/// Spawn a TCP or TLS FIX Connection
pub async fn build_connection(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This being moved out of FixConnection is a good step.

config: &SessionConfig,
session_ref: SessionRef<impl FixMessage>,
) -> io::Result<FixConnection<WriterRef>> {
let use_tls = config.tls_config.is_some();

let conn = if use_tls {
let stream = create_tcp_over_tls_connection(config).await?;
_create_io_refs(session_ref.clone(), stream).await
} else {
let stream = create_tcp_connection(config).await?;
_create_io_refs(session_ref.clone(), stream).await
};

Ok(conn)
}

async fn _create_io_refs<M, Stream>(
session_ref: SessionRef<M>,
stream: Stream,
) -> FixConnection<WriterRef>
where
M: FixMessage,
Stream: AsyncRead + AsyncWrite + Send + 'static,
Expand All @@ -51,8 +63,5 @@ where
let writer_ref = WriterRef::new(writer);
let reader_ref = ReaderRef::new(reader, session_ref);

FixConnection {
_writer: writer_ref,
_reader: reader_ref,
}
FixConnection::from((writer_ref, reader_ref))
}
16 changes: 6 additions & 10 deletions crates/hotfix/src/transport/socket_reader.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use tokio::io::{AsyncRead, AsyncReadExt, ReadHalf};
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio::sync::oneshot;
use tracing::debug;

Expand All @@ -16,7 +16,7 @@ pub struct ReaderRef {

impl ReaderRef {
pub fn new<M: FixMessage>(
reader: ReadHalf<impl AsyncRead + Send + 'static>,
reader: impl AsyncRead + Send + Unpin + 'static,
session_ref: SessionRef<M>,
) -> Self {
let (dc_sender, dc_receiver) = oneshot::channel();
Expand All @@ -35,18 +35,14 @@ impl ReaderRef {
}
}

struct ReaderActor<M, R> {
reader: ReadHalf<R>,
struct ReaderActor<M, R: AsyncRead> {
reader: R,
session_ref: SessionRef<M>,
dc_sender: oneshot::Sender<()>,
}

impl<M, R: AsyncRead> ReaderActor<M, R> {
fn new(
reader: ReadHalf<R>,
session_ref: SessionRef<M>,
dc_sender: oneshot::Sender<()>,
) -> Self {
fn new(reader: R, session_ref: SessionRef<M>, dc_sender: oneshot::Sender<()>) -> Self {
Self {
reader,
session_ref,
Expand All @@ -58,7 +54,7 @@ impl<M, R: AsyncRead> ReaderActor<M, R> {
async fn run_reader<M, R>(mut actor: ReaderActor<M, R>)
where
M: FixMessage,
R: AsyncRead,
R: AsyncRead + Unpin,
{
let mut parser = Parser::default();
loop {
Expand Down
89 changes: 55 additions & 34 deletions crates/hotfix/src/transport/socket_writer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use tokio::io::{AsyncWrite, AsyncWriteExt, WriteHalf};
use async_trait::async_trait;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc;
use tracing::{debug, warn};

use crate::message::parser::RawFixMessage;
use crate::transport::actor::{Actor, WriterModel};

#[derive(Clone, Debug)]
pub enum WriterMessage {
Expand All @@ -15,40 +17,19 @@ pub struct WriterRef {
sender: mpsc::Sender<WriterMessage>,
}

impl WriterRef {
pub fn new(writer: WriteHalf<impl AsyncWrite + Send + 'static>) -> Self {
let (sender, mailbox) = mpsc::channel(10);
let actor = WriterActor::new(writer, mailbox);
tokio::spawn(run_writer(actor));

Self { sender }
}

pub async fn send_raw_message(&self, msg: RawFixMessage) {
self.sender
.send(WriterMessage::SendMessage(msg))
.await
.expect("be able to send message");
}

pub async fn disconnect(&self) {
self.sender
.send(WriterMessage::Disconnect)
.await
.expect("be able to disconnect")
}
}

struct WriterActor<W> {
writer: WriteHalf<W>,
pub struct WriterActor<W: AsyncWrite> {
writer: W,
mailbox: mpsc::Receiver<WriterMessage>,
}

impl<W: AsyncWrite> WriterActor<W> {
fn new(writer: WriteHalf<W>, mailbox: mpsc::Receiver<WriterMessage>) -> Self {
fn new(writer: W, mailbox: mpsc::Receiver<WriterMessage>) -> Self {
Self { writer, mailbox }
}
}

#[async_trait]
impl<W: AsyncWrite + Send + Unpin + 'static> Actor<WriterMessage> for WriterActor<W> {
async fn handle(&mut self, message: WriterMessage) -> bool {
match message {
WriterMessage::SendMessage(fix_message) => {
Expand All @@ -63,14 +44,54 @@ impl<W: AsyncWrite> WriterActor<W> {
WriterMessage::Disconnect => false,
}
}
async fn next(&mut self) -> Option<WriterMessage> {
self.mailbox.recv().await
}
}

async fn run_writer<W: AsyncWrite>(mut actor: WriterActor<W>) {
while let Some(msg) = actor.mailbox.recv().await {
if !actor.handle(msg).await {
break;
}
#[async_trait]
impl WriterModel for WriterRef {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to implement a trait for WriterRef? Other than the new function, it doesn't know about the writer, and I'm not even sure we'll need a writer in the mock implementation.

fn new<W: AsyncWrite + Send + Unpin + 'static>(writer: W) -> Self {
let (sender, mailbox) = mpsc::channel(10);
let actor = WriterActor::new(writer, mailbox);
actor.run();

Self { sender }
}

debug!("writer loop is shutting down");
async fn send_raw_message(&self, msg: RawFixMessage) {
self.sender
.send(WriterMessage::SendMessage(msg))
.await
.expect("be able to send message");
}

async fn disconnect(&self) {
self.sender
.send(WriterMessage::Disconnect)
.await
.expect("be able to disconnect")
}
}

#[cfg(test)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know in this codebase we put the tests on a separate file but due to the module visibility restrictions I put it here just for showing the use case. Can move the test to a separate file and or get rid of it depending on the outcome of the discussion

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We very much put unit tests in the same file. I only broke out the message store tests because spinning up a mongodb container in unit tests felt wrong.

mod tests {
use super::WriterRef;
use crate::message::parser::RawFixMessage;
use crate::transport::actor::WriterModel;
use tokio::io::AsyncReadExt;

#[tokio::test]
async fn test_mocked_writer() {
let (writer, mut reader) = tokio::io::duplex(10);
let writer_ref = WriterRef::new(writer);

writer_ref
.send_raw_message(RawFixMessage::new(vec![1, 2, 3]))
.await;

assert_eq!(1, reader.read_u8().await.unwrap());
assert_eq!(2, reader.read_u8().await.unwrap());
assert_eq!(3, reader.read_u8().await.unwrap());
}
}