Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(library
(name pollinate)
(public_name pollinate)
(libraries pollinate.common pollinate.node))
(libraries pollinate.common pollinate.pnode))
18 changes: 8 additions & 10 deletions lib/node/client.ml → lib/pnode/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ let create_request node ?(request_ack = false) recipient payload =
Lwt.return
Message.
{
category = Message.Request;
sub_category_opt = None;
pollinate_category = Message.Request;
request_ack;
id = !id;
timestamp = Unix.gettimeofday ();
Expand All @@ -40,8 +39,7 @@ let create_request node ?(request_ack = false) recipient payload =
let create_response node ?(request_ack = false) request payload =
Message.
{
category = Message.Response;
sub_category_opt = None;
pollinate_category = Message.Response;
request_ack;
id = request.id;
timestamp = Unix.gettimeofday ();
Expand All @@ -53,10 +51,9 @@ let create_response node ?(request_ack = false) request payload =
let create_post node ?(request_ack = false) payload =
Message.
{
category = Message.Post;
pollinate_category = Message.Post;
request_ack;
id = -1;
sub_category_opt = None;
timestamp = Unix.gettimeofday ();
sender = !node.address;
recipients = [];
Expand All @@ -66,8 +63,7 @@ let create_post node ?(request_ack = false) payload =
let create_ack node incoming_message =
Message.
{
category = Message.Acknowledgment;
sub_category_opt = None;
pollinate_category = Message.Acknowledgment;
request_ack = false;
id = -1;
timestamp = Unix.gettimeofday ();
Expand All @@ -76,12 +72,14 @@ let create_ack node incoming_message =
payload = incoming_message |> Message.hash_of |> Bytes.of_string;
}

let request node request recipient =
let%lwt message = create_request node recipient request in
let request node message =
let%lwt () = Networking.send_to node message in
let condition_var = Lwt_condition.create () in
Hashtbl.add !node.request_table message.id condition_var;
Lwt_condition.wait condition_var

let post node message =
!node.disseminator <- Disseminator.post !node.disseminator message

let broadcast node message recipients =
Networking.broadcast node message recipients
4 changes: 3 additions & 1 deletion lib/node/client.mli → lib/pnode/client.mli
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ val create_response :
returns a promise holding the response from the peer. This
function blocks the current thread of execution until a response
arrives. *)
val request : node ref -> bytes -> Address.t -> Message.t Lwt.t
val request : node ref -> Message.t -> Message.t Lwt.t

(** [create_post node payload] creates a [Message.t] of the {i Post category}
containing {i payload} for eventual gossip dissemination across the
entire network. *)
val create_post : node ref -> ?request_ack:bool -> bytes -> Message.t

val create_ack : node ref -> Message.t -> Message.t

val broadcast : node ref -> Message.t -> Address.t list -> unit Lwt.t
File renamed without changes.
File renamed without changes.
4 changes: 2 additions & 2 deletions lib/node/dune → lib/pnode/dune
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(library
(name node)
(public_name pollinate.node)
(name pnode)
(public_name pollinate.pnode)
(libraries bin_prot lwt lwt.unix pollinate.common)
(preprocess
(pps ppx_bin_prot ppx_deriving.show lwt_ppx)))
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@ let update_peer_status node peer status =
let create_message node message recipient =
Message.
{
category = Failure_detection;
pollinate_category = Failure_detection;
request_ack = false;
sub_category_opt = None;
id = -1;
timestamp = Unix.gettimeofday ();
sender = Client.address_of !node;
Expand Down
File renamed without changes.
5 changes: 2 additions & 3 deletions lib/node/message.ml → lib/pnode/message.ml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
open Common
open Bin_prot.Std

type category =
type pollinate_category =
| Uncategorized
| Acknowledgment
| Request
Expand All @@ -12,8 +12,7 @@ type category =
[@@deriving bin_io, show]

type t = {
category : category;
sub_category_opt : (string * string) option;
pollinate_category : pollinate_category;
request_ack : bool;
id : int;
timestamp : float;
Expand Down
5 changes: 2 additions & 3 deletions lib/node/message.mli → lib/pnode/message.mli
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ open Common
(** Messages are {i requests} or {i responses},
determining how they are stored and
where they are handled. *)
type category =
type pollinate_category =
| Uncategorized
| Acknowledgment
| Request
Expand All @@ -23,8 +23,7 @@ type category =
(** Messages received from [peers] which are
processed by the node's message handler. *)
type t = {
category : category;
sub_category_opt : (string * string) option;
pollinate_category : pollinate_category;
request_ack : bool;
id : int;
timestamp : float;
Expand Down
File renamed without changes.
3 changes: 3 additions & 0 deletions lib/node/networking.mli → lib/pnode/networking.mli
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ to a specified peer within the [Message.t]. Construct a message with one of the
[create_*] functions to then feed to this function. *)
val send_to : node ref -> Message.t -> unit Lwt.t

(** Same as `send_to` but to several recipients *)
val broadcast : node ref -> Message.t -> Address.t list -> unit Lwt.t

(** Waits for the next incoming message and returns it. *)
val recv_next : node ref -> Message.t Lwt.t

Expand Down
File renamed without changes.
File renamed without changes.
17 changes: 9 additions & 8 deletions lib/node/server.ml → lib/pnode/server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ let handle_ack node msg =

Otherwise, we just apply the message handler and that's it.
*)
let process_message node preprocessor msg_handler =
let process_message node preprocessor (msg_handler : Message.t -> bytes option)
=
let open Message in
let%lwt message = Networking.recv_next node in
let message = preprocessor message in
Expand All @@ -43,7 +44,7 @@ let process_message node preprocessor msg_handler =
(Printf.sprintf "Processing message %s from %d...\n"
(Message.hash_of message) message.sender.port) in *)
let%lwt () =
match message.category with
match message.pollinate_category with
| Response -> Lwt.return (handle_response node message)
| Request -> (
(* let%lwt () =
Expand All @@ -52,11 +53,11 @@ let process_message node preprocessor msg_handler =
!node.address.address !node.address.port message.sender.address
message.sender.port) in *)
match msg_handler message with
| Some response ->
response
|> Client.create_response node message
|> Networking.send_to node
| None -> Lwt.return ())
| Some payload ->
(* let _ = Printf.sprintf "I am inside msg_handler, found payload\n%!" in *)
Client.create_response node message payload |> Networking.send_to node
| None ->
failwith "received request without payload nor payload_signature")
| Acknowledgment ->
let msg_hash = Bytes.to_string message.payload in
let new_addrs =
Expand Down Expand Up @@ -128,7 +129,7 @@ let _print_logs node =
3. Run the disseminator, this includes actually sending messages to be
disseminated across the network.
4. Wait 0.001 seconds before restarting the procedure. *)
let rec run node preprocessor msg_handler =
let rec run node preprocessor (msg_handler : Message.t -> bytes option) =
(* Step 0 *)
(* let%lwt () = print_logs node in *)
(* Step 1 *)
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion lib/pollinate.ml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Address = Common.Address
module Util = Common.Util
module Peer = Common.Peer
module Node = Node
module PNode = Pnode
14 changes: 8 additions & 6 deletions test/commons.ml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
(** Utils function shared by the different tests modules *)
module Commons = struct
open Pollinate.Node
open Pollinate.PNode
open Pollinate.Util
open Messages
open Message

let preprocessor msg =
let open Messages in
match msg.Message.category with
match msg.Message.pollinate_category with
| Request ->
let[@warning "-8"] (Request r) =
Encoding.unpack bin_read_message msg.Message.payload in
Expand All @@ -19,15 +20,16 @@ module Commons = struct

let msg_handler message =
let open Messages in
let open Message in
match message.category with
match message.pollinate_category with
| Request ->
let request = Encoding.unpack bin_read_request message.payload in
let response =
match request with
| Ping -> Pong
| Get -> Pong
| Insert _ -> Success "Successfully added value to state" in
Response response |> Encoding.pack bin_writer_message |> Option.some
| _ -> None
let msg = Response response |> Encoding.pack bin_writer_message in
Some msg
| Post -> None
| _ -> failwith "unhandled in tests"
end
50 changes: 25 additions & 25 deletions test/disseminator_tests.ml
Original file line number Diff line number Diff line change
@@ -1,53 +1,53 @@
open Lwt.Infix
open Commons
open Pollinate
open Pollinate.Node
open Pollinate.PNode

module Disseminator_tests = struct
let node =
Lwt_main.run
(let%lwt node_a =
Node.init ~init_peers:[] Address.{ address = "127.0.0.1"; port = 5000 }
in
Pnode.init ~init_peers:[]
Address.{ address = "127.0.0.1"; port = 5000 } in
Lwt.return node_a)

let queue_insertion_test () =
let _server = Node.run_server ~msg_handler:Commons.msg_handler node in
Client.address_of !node
|> (fun Address.{ port; _ } -> port)
|> string_of_int
|> String.to_bytes
|> Client.create_post node
|> Client.post node;
let _server = Pnode.run_server ~msg_handler:Commons.msg_handler node in
let payload =
Client.address_of !node
|> (fun Address.{ port; _ } -> port)
|> string_of_int
|> String.to_bytes in
Client.create_post node payload |> Client.post node;

Lwt.return (List.length (Node.Testing.broadcast_queue node))
Lwt.return (List.length (Pnode.Testing.broadcast_queue node))

let queue_removal_test () =
let _server = Node.run_server ~msg_handler:Commons.msg_handler node in
Client.address_of !node
|> (fun Address.{ port; _ } -> port)
|> string_of_int
|> String.to_bytes
|> Client.create_post node
|> Client.post node;
let _server = Pnode.run_server ~msg_handler:Commons.msg_handler node in
let payload =
Client.address_of !node
|> (fun Address.{ port; _ } -> port)
|> string_of_int
|> String.to_bytes in
Client.create_post node payload |> Client.post node;

let%lwt () =
while%lwt Node.Testing.disseminator_round node <= 10 do
while%lwt Pnode.Testing.disseminator_round node <= 10 do
Lwt_unix.sleep 0.1
done in
Lwt.return (List.length (Node.Testing.broadcast_queue node))
Lwt.return (List.length (Pnode.Testing.broadcast_queue node))

let seen_message_test () =
let _server = Node.run_server ~msg_handler:Commons.msg_handler node in
let message =
let _server = Pnode.run_server ~msg_handler:Commons.msg_handler node in
let payload =
Client.address_of !node
|> (fun Address.{ port; _ } -> port)
|> string_of_int
|> String.to_bytes
|> Client.create_post node in
|> String.to_bytes in
let message = Client.create_post node payload in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will always advocate for long pipelines, but ultimately doesn't matter 😄

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still not fond of long pipelines, I will let as it is right now 😉

message |> Client.post node;

Lwt.return (Node.seen node message)
Lwt.return (Pnode.seen node message)
end

(** Test for dissemination given a specific node. *)
Expand Down
6 changes: 3 additions & 3 deletions test/failure_detector_prop.ml
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
open QCheck2.Gen
open Pollinate.Peer
open Pollinate
module SUT = Pollinate.Node.Testing.Failure_detector
module SUT = Pollinate.PNode.Testing.Failure_detector

let node_a =
Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3002 })
Lwt_main.run (PNode.init Address.{ address = "127.0.0.1"; port = 3002 })

let update_peer =
QCheck2.Test.make ~count:1000
~name:"update_neighbor_status successfully update neighbor status"
(pair Generators.peer_gen Generators.peer_status_gen)
(fun (neighbor, neighbor_status) ->
let _ = add_neighbor (Pollinate.Node.Client.peer_from !node_a) neighbor in
let _ = add_neighbor (PNode.Client.peer_from !node_a) neighbor in
let _ = SUT.update_peer_status node_a neighbor neighbor_status in
neighbor.status = neighbor_status)

Expand Down
12 changes: 6 additions & 6 deletions test/failure_detector_tests.ml
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
open Pollinate
open Pollinate.Node
open Pollinate.PNode
open Lwt.Infix
module SUT = Pollinate.Node.Testing.Failure_detector
module SUT = Pollinate.PNode.Testing.Failure_detector

let node_a =
Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3003 })
Lwt_main.run (Pnode.init Address.{ address = "127.0.0.1"; port = 3003 })

let node_b =
Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3004 })
Lwt_main.run (Pnode.init Address.{ address = "127.0.0.1"; port = 3004 })

let peer_b = Client.peer_from !node_b

let failure_detection () =
let open Common.Peer in
let open Client in
let _ = add_peer_as_is !node_a peer_b in
let _ = Pollinate.Node.Client.peer_from !node_a in
let _ = PNode.Client.peer_from !node_a in
let _ = SUT.update_peer_status node_a peer_b Suspicious in
(* Need to wait for the timeout to be reached An other way to do, would be to change the `last_suspicious_status` of the peer *)
let%lwt _ = Lwt_unix.sleep 9.1 in
Expand All @@ -24,7 +24,7 @@ let failure_detection () =

let failure_detection_nothing_on_alive () =
let open Common.Peer in
let _ = add_neighbor (Pollinate.Node.Client.peer_from !node_a) peer_b in
let _ = add_neighbor (PNode.Client.peer_from !node_a) peer_b in
let _ = SUT.update_peer_status node_a peer_b Alive in
let%lwt _ = SUT.failure_detection node_a in
Lwt.return (Base.Hashtbl.length !node_a.peers = 1)
Expand Down
Loading