Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(library
(name pollinate)
(public_name pollinate)
(libraries pollinate.common pollinate.node))
(libraries pollinate.common pollinate.pnode))
6 changes: 0 additions & 6 deletions lib/node/dune

This file was deleted.

13 changes: 8 additions & 5 deletions lib/node/client.ml → lib/pnode/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ let peer_from { address; peers; _ } =
let add_peer node (peer : Peer.t) =
Base.Hashtbl.add node.peers ~key:peer.address ~data:peer

let create_request node recipient payload =
let create_request node recipient (payload, payload_signature) =
Mutex.with_lock !node.current_request_id (fun id ->
id := !id + 1;
Lwt.return
Expand All @@ -28,9 +28,10 @@ let create_request node recipient payload =
sender = !node.address;
recipients = [recipient];
payload;
payload_signature;
})

let create_response node request payload =
let create_response node request (payload, payload_signature) =
Message.
{
category = Message.Response;
Expand All @@ -40,9 +41,10 @@ let create_response node request payload =
sender = !node.address;
recipients = [request.sender];
payload;
payload_signature;
}

let create_post node payload =
let create_post node (payload, payload_signature) =
Message.
{
category = Message.Post;
Expand All @@ -52,10 +54,11 @@ let create_post node payload =
sender = !node.address;
recipients = [];
payload;
payload_signature;
}

let request node request recipient =
let%lwt message = create_request node recipient request in
let request node (request, payload_signature) recipient =
let%lwt message = create_request node recipient (request, payload_signature) in
let%lwt () = Networking.send_to node message in
let condition_var = Lwt_condition.create () in
Hashtbl.add !node.request_table message.id condition_var;
Expand Down
9 changes: 5 additions & 4 deletions lib/node/client.mli → lib/pnode/client.mli
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@ val post : node ref -> Message.t -> unit

(** [create_request node recipient payload] creates a [Message.t] of the {i Request category}
addressed to {i recipient} containing {i payload}. *)
val create_request : node ref -> Address.t -> bytes -> Message.t Lwt.t
val create_request :
node ref -> Address.t -> bytes * bytes option -> Message.t Lwt.t

(** [create_response node request payload] creates a [Message.t] of the {i Response category}
that responds to {i request} whose content is {i payload}. *)
val create_response : node ref -> Message.t -> bytes -> Message.t
val create_response : node ref -> Message.t -> bytes * bytes option -> Message.t

(** Sends an encoded {i request} to the specified peer and
returns a promise holding the response from the peer. This
function blocks the current thread of execution until a response
arrives. *)
val request : node ref -> bytes -> Address.t -> Message.t Lwt.t
val request : node ref -> bytes * bytes option -> Address.t -> Message.t Lwt.t

val create_post : node ref -> bytes -> Message.t
val create_post : node ref -> bytes * bytes option -> Message.t
File renamed without changes.
File renamed without changes.
11 changes: 11 additions & 0 deletions lib/pnode/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
(library
(name pnode)
(public_name pollinate.pnode)
(libraries bin_prot lwt lwt.unix pollinate.common)
(preprocess
(pps
ppx_bin_prot
lwt_ppx
ppx_deriving.eq
ppx_deriving.ord
ppx_deriving.show)))
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ let create_message node message recipient =
sender = Client.address_of !node;
recipients = [recipient.Peer.address];
payload = Encoding.pack bin_writer_message message;
payload_signature = None;
}

let send_message message node recipient =
Expand Down
File renamed without changes.
5 changes: 3 additions & 2 deletions lib/node/message.ml → lib/pnode/message.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type category =
| Post
| Failure_detection
| Custom of string
[@@deriving bin_io, show]
[@@deriving bin_io, show, eq, ord]

type t = {
category : category;
Expand All @@ -18,8 +18,9 @@ type t = {
sender : Address.t;
recipients : Address.t list;
payload : bytes;
payload_signature : bytes option;
}
[@@deriving bin_io]
[@@deriving bin_io, eq, ord]

let hash_of m =
[
Expand Down
5 changes: 3 additions & 2 deletions lib/node/message.mli → lib/pnode/message.mli
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type category =
| Post
| Failure_detection
| Custom of string
[@@deriving bin_io, show]
[@@deriving bin_io, show, eq, ord]

(** Messages received from [peers] which are
processed by the node's message handler. *)
Expand All @@ -29,7 +29,8 @@ type t = {
sender : Address.t;
recipients : Address.t list;
payload : bytes;
payload_signature : bytes option;
}
[@@deriving bin_io]
[@@deriving bin_io, eq, ord]

val hash_of : t -> Digest.t
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion lib/node/node.mli → lib/pnode/pnode.mli
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ val init : ?init_peers:Address.t list -> Address.t -> t ref Lwt.t

val run_server :
?preprocessor:(Message.t -> Message.t) ->
msg_handler:(Message.t -> bytes option) ->
msg_handler:(Message.t -> bytes option * bytes option) ->
t ref ->
'b Lwt.t

Expand Down
9 changes: 5 additions & 4 deletions lib/node/server.ml → lib/pnode/server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ let handle_response node res =

Otherwise, we just apply the message handler and that's it.
*)
let process_message node preprocessor msg_handler =
let process_message node preprocessor
(msg_handler : Message.t -> bytes option * bytes option) =
let open Message in
let%lwt message = Networking.recv_next node in
let message = preprocessor message in
Expand All @@ -44,11 +45,11 @@ let process_message node preprocessor msg_handler =
!node.address.address !node.address.port message.sender.address
message.sender.port) in
match msg_handler message with
| Some response ->
response
| Some payload, signature_payload ->
(payload, signature_payload)
|> Client.create_response node message
|> Networking.send_to node
| None -> Lwt.return ())
| None, _ -> Lwt.return ())
| Failure_detection -> Failure_detector.handle_message node message
| Post ->
if not (Disseminator.seen !node.disseminator message) then (
Expand Down
2 changes: 1 addition & 1 deletion lib/node/server.mli → lib/pnode/server.mli
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
val run :
Types.node ref ->
(Message.t -> Message.t) ->
(Message.t -> bytes option) ->
(Message.t -> bytes option * bytes option) ->
'b Lwt.t
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion lib/pollinate.ml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Address = Common.Address
module Util = Common.Util
module Peer = Common.Peer
module Node = Node
module PNode = Pnode
21 changes: 9 additions & 12 deletions test/commons.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(** Utils function shared by the different tests modules *)
module Commons = struct
open Pollinate.Node
open Pollinate.PNode
open Pollinate.Util
open Messages

Expand All @@ -19,15 +19,12 @@ module Commons = struct

let msg_handler message =
let open Messages in
let open Message in
match message.category with
| Request ->
let request = Encoding.unpack bin_read_request message.payload in
let response =
match request with
| Ping -> Pong
| Get -> Pong
| Insert _ -> Success "Successfully added value to state" in
Response response |> Encoding.pack bin_writer_message |> Option.some
| _ -> None
let open Pollinate.PNode.Message in
let request = Encoding.unpack bin_read_request message.payload in
let response =
match request with
| Ping -> Pong
| Get -> Success "Ok"
| Insert _ -> Success "Successfully added value to state" in
(Some (Encoding.pack bin_writer_message (Response response)), None)
end
50 changes: 25 additions & 25 deletions test/disseminator_tests.ml
Original file line number Diff line number Diff line change
@@ -1,53 +1,53 @@
open Lwt.Infix
open Commons
open Pollinate
open Pollinate.Node
open Pollinate.PNode

module Disseminator_tests = struct
let node =
Lwt_main.run
(let%lwt node_a =
Node.init ~init_peers:[] Address.{ address = "127.0.0.1"; port = 5000 }
in
PNode.init ~init_peers:[]
Address.{ address = "127.0.0.1"; port = 5000 } in
Lwt.return node_a)

let queue_insertion_test () =
let _server = Node.run_server ~msg_handler:Commons.msg_handler node in
Client.address_of !node
|> (fun Address.{ port; _ } -> port)
|> string_of_int
|> String.to_bytes
|> Client.create_post node
|> Client.post node;
let _server = PNode.run_server ~msg_handler:Commons.msg_handler node in
let payload =
Client.address_of !node
|> (fun Address.{ port; _ } -> port)
|> string_of_int
|> String.to_bytes in
Client.create_post node (payload, None) |> Client.post node;

Lwt.return (List.length (Node.Testing.broadcast_queue node))
Lwt.return (List.length (PNode.Testing.broadcast_queue node))

let queue_removal_test () =
let _server = Node.run_server ~msg_handler:Commons.msg_handler node in
Client.address_of !node
|> (fun Address.{ port; _ } -> port)
|> string_of_int
|> String.to_bytes
|> Client.create_post node
|> Client.post node;
let _server = PNode.run_server ~msg_handler:Commons.msg_handler node in
let payload =
Client.address_of !node
|> (fun Address.{ port; _ } -> port)
|> string_of_int
|> String.to_bytes in
Client.create_post node (payload, None) |> Client.post node;

let%lwt () =
while%lwt Node.Testing.disseminator_round node <= 10 do
while%lwt PNode.Testing.disseminator_round node <= 10 do
Lwt_unix.sleep 0.1
done in
Lwt.return (List.length (Node.Testing.broadcast_queue node))
Lwt.return (List.length (PNode.Testing.broadcast_queue node))

let seen_message_test () =
let _server = Node.run_server ~msg_handler:Commons.msg_handler node in
let message =
let _server = PNode.run_server ~msg_handler:Commons.msg_handler node in
let payload =
Client.address_of !node
|> (fun Address.{ port; _ } -> port)
|> string_of_int
|> String.to_bytes
|> Client.create_post node in
|> String.to_bytes in
let message = Client.create_post node (payload, None) in
message |> Client.post node;

Lwt.return (Node.seen node message)
Lwt.return (PNode.seen node message)
end

(** Test for dissemination given a specific node. *)
Expand Down
6 changes: 3 additions & 3 deletions test/failure_detector_prop.ml
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
open QCheck2.Gen
open Pollinate.Peer
open Pollinate
module SUT = Pollinate.Node.Testing.Failure_detector
module SUT = Pollinate.PNode.Testing.Failure_detector

let node_a =
Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3002 })
Lwt_main.run (PNode.init Address.{ address = "127.0.0.1"; port = 3002 })

let update_peer =
QCheck2.Test.make ~count:1000
~name:"update_neighbor_status successfully update neighbor status"
(pair Generators.peer_gen Generators.peer_status_gen)
(fun (neighbor, neighbor_status) ->
let _ = add_neighbor (Pollinate.Node.Client.peer_from !node_a) neighbor in
let _ = add_neighbor (Pollinate.PNode.Client.peer_from !node_a) neighbor in
let _ = SUT.update_peer_status node_a neighbor neighbor_status in
neighbor.status = neighbor_status)

Expand Down
12 changes: 6 additions & 6 deletions test/failure_detector_tests.ml
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
open Pollinate
open Pollinate.Node
open Pollinate.PNode
open Lwt.Infix
module SUT = Pollinate.Node.Testing.Failure_detector
module SUT = Pollinate.PNode.Testing.Failure_detector

let node_a =
Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3003 })
Lwt_main.run (PNode.init Address.{ address = "127.0.0.1"; port = 3003 })

let node_b =
Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3004 })
Lwt_main.run (PNode.init Address.{ address = "127.0.0.1"; port = 3004 })

let peer_b = Client.peer_from !node_b

let failure_detection () =
let open Common.Peer in
let open Client in
let _ = add_peer !node_a peer_b in
let _ = Pollinate.Node.Client.peer_from !node_a in
let _ = Pollinate.PNode.Client.peer_from !node_a in
let _ = SUT.update_peer_status node_a peer_b Suspicious in
(* Need to wait for the timeout to be reached An other way to do, would be to change the `last_suspicious_status` of the peer *)
let%lwt _ = Lwt_unix.sleep 9.1 in
Expand All @@ -24,7 +24,7 @@ let failure_detection () =

let failure_detection_nothing_on_alive () =
let open Common.Peer in
let _ = add_neighbor (Pollinate.Node.Client.peer_from !node_a) peer_b in
let _ = add_neighbor (Pollinate.PNode.Client.peer_from !node_a) peer_b in
let _ = SUT.update_peer_status node_a peer_b Alive in
let%lwt _ = SUT.failure_detection node_a in
Lwt.return (Base.Hashtbl.length !node_a.peers = 1)
Expand Down
Loading