diff --git a/lib/dune b/lib/dune index f5c1898..71009f1 100644 --- a/lib/dune +++ b/lib/dune @@ -1,4 +1,4 @@ (library (name pollinate) (public_name pollinate) - (libraries pollinate.common pollinate.node)) + (libraries pollinate.common pollinate.pnode)) diff --git a/lib/node/dune b/lib/node/dune deleted file mode 100644 index b911382..0000000 --- a/lib/node/dune +++ /dev/null @@ -1,6 +0,0 @@ -(library - (name node) - (public_name pollinate.node) - (libraries bin_prot lwt lwt.unix pollinate.common) - (preprocess - (pps ppx_bin_prot ppx_deriving.show lwt_ppx))) diff --git a/lib/node/client.ml b/lib/pnode/client.ml similarity index 78% rename from lib/node/client.ml rename to lib/pnode/client.ml index 7bc8b9f..05a76ce 100644 --- a/lib/node/client.ml +++ b/lib/pnode/client.ml @@ -15,7 +15,7 @@ let peer_from { address; peers; _ } = let add_peer node (peer : Peer.t) = Base.Hashtbl.add node.peers ~key:peer.address ~data:peer -let create_request node recipient payload = +let create_request node recipient (payload, payload_signature) = Mutex.with_lock !node.current_request_id (fun id -> id := !id + 1; Lwt.return @@ -28,9 +28,10 @@ let create_request node recipient payload = sender = !node.address; recipients = [recipient]; payload; + payload_signature; }) -let create_response node request payload = +let create_response node request (payload, payload_signature) = Message. { category = Message.Response; @@ -40,9 +41,10 @@ let create_response node request payload = sender = !node.address; recipients = [request.sender]; payload; + payload_signature; } -let create_post node payload = +let create_post node (payload, payload_signature) = Message. { category = Message.Post; @@ -52,10 +54,11 @@ let create_post node payload = sender = !node.address; recipients = []; payload; + payload_signature; } -let request node request recipient = - let%lwt message = create_request node recipient request in +let request node (request, payload_signature) recipient = + let%lwt message = create_request node recipient (request, payload_signature) in let%lwt () = Networking.send_to node message in let condition_var = Lwt_condition.create () in Hashtbl.add !node.request_table message.id condition_var; diff --git a/lib/node/client.mli b/lib/pnode/client.mli similarity index 76% rename from lib/node/client.mli rename to lib/pnode/client.mli index f75fe74..9292de2 100644 --- a/lib/node/client.mli +++ b/lib/pnode/client.mli @@ -20,16 +20,17 @@ val post : node ref -> Message.t -> unit (** [create_request node recipient payload] creates a [Message.t] of the {i Request category} addressed to {i recipient} containing {i payload}. *) -val create_request : node ref -> Address.t -> bytes -> Message.t Lwt.t +val create_request : + node ref -> Address.t -> bytes * bytes option -> Message.t Lwt.t (** [create_response node request payload] creates a [Message.t] of the {i Response category} that responds to {i request} whose content is {i payload}. *) -val create_response : node ref -> Message.t -> bytes -> Message.t +val create_response : node ref -> Message.t -> bytes * bytes option -> Message.t (** Sends an encoded {i request} to the specified peer and returns a promise holding the response from the peer. This function blocks the current thread of execution until a response arrives. *) -val request : node ref -> bytes -> Address.t -> Message.t Lwt.t +val request : node ref -> bytes * bytes option -> Address.t -> Message.t Lwt.t -val create_post : node ref -> bytes -> Message.t +val create_post : node ref -> bytes * bytes option -> Message.t diff --git a/lib/node/disseminator.ml b/lib/pnode/disseminator.ml similarity index 100% rename from lib/node/disseminator.ml rename to lib/pnode/disseminator.ml diff --git a/lib/node/disseminator.mli b/lib/pnode/disseminator.mli similarity index 100% rename from lib/node/disseminator.mli rename to lib/pnode/disseminator.mli diff --git a/lib/pnode/dune b/lib/pnode/dune new file mode 100644 index 0000000..d250c7b --- /dev/null +++ b/lib/pnode/dune @@ -0,0 +1,11 @@ +(library + (name pnode) + (public_name pollinate.pnode) + (libraries bin_prot lwt lwt.unix pollinate.common) + (preprocess + (pps + ppx_bin_prot + lwt_ppx + ppx_deriving.eq + ppx_deriving.ord + ppx_deriving.show))) diff --git a/lib/node/failure_detector.ml b/lib/pnode/failure_detector.ml similarity index 99% rename from lib/node/failure_detector.ml rename to lib/pnode/failure_detector.ml index 7866ae9..4ce1d07 100644 --- a/lib/node/failure_detector.ml +++ b/lib/pnode/failure_detector.ml @@ -64,6 +64,7 @@ let create_message node message recipient = sender = Client.address_of !node; recipients = [recipient.Peer.address]; payload = Encoding.pack bin_writer_message message; + payload_signature = None; } let send_message message node recipient = diff --git a/lib/node/failure_detector.mli b/lib/pnode/failure_detector.mli similarity index 100% rename from lib/node/failure_detector.mli rename to lib/pnode/failure_detector.mli diff --git a/lib/node/message.ml b/lib/pnode/message.ml similarity index 85% rename from lib/node/message.ml rename to lib/pnode/message.ml index 4be371d..82cd0bb 100644 --- a/lib/node/message.ml +++ b/lib/pnode/message.ml @@ -8,7 +8,7 @@ type category = | Post | Failure_detection | Custom of string -[@@deriving bin_io, show] +[@@deriving bin_io, show, eq, ord] type t = { category : category; @@ -18,8 +18,9 @@ type t = { sender : Address.t; recipients : Address.t list; payload : bytes; + payload_signature : bytes option; } -[@@deriving bin_io] +[@@deriving bin_io, eq, ord] let hash_of m = [ diff --git a/lib/node/message.mli b/lib/pnode/message.mli similarity index 88% rename from lib/node/message.mli rename to lib/pnode/message.mli index dfca608..bc88c7e 100644 --- a/lib/node/message.mli +++ b/lib/pnode/message.mli @@ -17,7 +17,7 @@ type category = | Post | Failure_detection | Custom of string -[@@deriving bin_io, show] +[@@deriving bin_io, show, eq, ord] (** Messages received from [peers] which are processed by the node's message handler. *) @@ -29,7 +29,8 @@ type t = { sender : Address.t; recipients : Address.t list; payload : bytes; + payload_signature : bytes option; } -[@@deriving bin_io] +[@@deriving bin_io, eq, ord] val hash_of : t -> Digest.t diff --git a/lib/node/networking.ml b/lib/pnode/networking.ml similarity index 100% rename from lib/node/networking.ml rename to lib/pnode/networking.ml diff --git a/lib/node/networking.mli b/lib/pnode/networking.mli similarity index 100% rename from lib/node/networking.mli rename to lib/pnode/networking.mli diff --git a/lib/node/node.ml b/lib/pnode/pnode.ml similarity index 100% rename from lib/node/node.ml rename to lib/pnode/pnode.ml diff --git a/lib/node/node.mli b/lib/pnode/pnode.mli similarity index 93% rename from lib/node/node.mli rename to lib/pnode/pnode.mli index 2b07f37..a035e0c 100644 --- a/lib/node/node.mli +++ b/lib/pnode/pnode.mli @@ -15,7 +15,7 @@ val init : ?init_peers:Address.t list -> Address.t -> t ref Lwt.t val run_server : ?preprocessor:(Message.t -> Message.t) -> - msg_handler:(Message.t -> bytes option) -> + msg_handler:(Message.t -> bytes option * bytes option) -> t ref -> 'b Lwt.t diff --git a/lib/node/server.ml b/lib/pnode/server.ml similarity index 95% rename from lib/node/server.ml rename to lib/pnode/server.ml index e74eedb..e8463b2 100644 --- a/lib/node/server.ml +++ b/lib/pnode/server.ml @@ -26,7 +26,8 @@ let handle_response node res = Otherwise, we just apply the message handler and that's it. *) -let process_message node preprocessor msg_handler = +let process_message node preprocessor + (msg_handler : Message.t -> bytes option * bytes option) = let open Message in let%lwt message = Networking.recv_next node in let message = preprocessor message in @@ -44,11 +45,11 @@ let process_message node preprocessor msg_handler = !node.address.address !node.address.port message.sender.address message.sender.port) in match msg_handler message with - | Some response -> - response + | Some payload, signature_payload -> + (payload, signature_payload) |> Client.create_response node message |> Networking.send_to node - | None -> Lwt.return ()) + | None, _ -> Lwt.return ()) | Failure_detection -> Failure_detector.handle_message node message | Post -> if not (Disseminator.seen !node.disseminator message) then ( diff --git a/lib/node/server.mli b/lib/pnode/server.mli similarity index 88% rename from lib/node/server.mli rename to lib/pnode/server.mli index 2cfa48c..17019e0 100644 --- a/lib/node/server.mli +++ b/lib/pnode/server.mli @@ -4,5 +4,5 @@ val run : Types.node ref -> (Message.t -> Message.t) -> - (Message.t -> bytes option) -> + (Message.t -> bytes option * bytes option) -> 'b Lwt.t diff --git a/lib/node/types.ml b/lib/pnode/types.ml similarity index 100% rename from lib/node/types.ml rename to lib/pnode/types.ml diff --git a/lib/node/types.mli b/lib/pnode/types.mli similarity index 100% rename from lib/node/types.mli rename to lib/pnode/types.mli diff --git a/lib/node/util.ml b/lib/pnode/util.ml similarity index 100% rename from lib/node/util.ml rename to lib/pnode/util.ml diff --git a/lib/pollinate.ml b/lib/pollinate.ml index 84d1900..ab4d85a 100644 --- a/lib/pollinate.ml +++ b/lib/pollinate.ml @@ -1,4 +1,4 @@ module Address = Common.Address module Util = Common.Util module Peer = Common.Peer -module Node = Node +module PNode = Pnode diff --git a/test/commons.ml b/test/commons.ml index 3b083e5..0a67326 100644 --- a/test/commons.ml +++ b/test/commons.ml @@ -1,6 +1,6 @@ (** Utils function shared by the different tests modules *) module Commons = struct - open Pollinate.Node + open Pollinate.PNode open Pollinate.Util open Messages @@ -19,15 +19,12 @@ module Commons = struct let msg_handler message = let open Messages in - let open Message in - match message.category with - | Request -> - let request = Encoding.unpack bin_read_request message.payload in - let response = - match request with - | Ping -> Pong - | Get -> Pong - | Insert _ -> Success "Successfully added value to state" in - Response response |> Encoding.pack bin_writer_message |> Option.some - | _ -> None + let open Pollinate.PNode.Message in + let request = Encoding.unpack bin_read_request message.payload in + let response = + match request with + | Ping -> Pong + | Get -> Success "Ok" + | Insert _ -> Success "Successfully added value to state" in + (Some (Encoding.pack bin_writer_message (Response response)), None) end diff --git a/test/disseminator_tests.ml b/test/disseminator_tests.ml index b865de7..27712d0 100644 --- a/test/disseminator_tests.ml +++ b/test/disseminator_tests.ml @@ -1,53 +1,53 @@ open Lwt.Infix open Commons open Pollinate -open Pollinate.Node +open Pollinate.PNode module Disseminator_tests = struct let node = Lwt_main.run (let%lwt node_a = - Node.init ~init_peers:[] Address.{ address = "127.0.0.1"; port = 5000 } - in + PNode.init ~init_peers:[] + Address.{ address = "127.0.0.1"; port = 5000 } in Lwt.return node_a) let queue_insertion_test () = - let _server = Node.run_server ~msg_handler:Commons.msg_handler node in - Client.address_of !node - |> (fun Address.{ port; _ } -> port) - |> string_of_int - |> String.to_bytes - |> Client.create_post node - |> Client.post node; + let _server = PNode.run_server ~msg_handler:Commons.msg_handler node in + let payload = + Client.address_of !node + |> (fun Address.{ port; _ } -> port) + |> string_of_int + |> String.to_bytes in + Client.create_post node (payload, None) |> Client.post node; - Lwt.return (List.length (Node.Testing.broadcast_queue node)) + Lwt.return (List.length (PNode.Testing.broadcast_queue node)) let queue_removal_test () = - let _server = Node.run_server ~msg_handler:Commons.msg_handler node in - Client.address_of !node - |> (fun Address.{ port; _ } -> port) - |> string_of_int - |> String.to_bytes - |> Client.create_post node - |> Client.post node; + let _server = PNode.run_server ~msg_handler:Commons.msg_handler node in + let payload = + Client.address_of !node + |> (fun Address.{ port; _ } -> port) + |> string_of_int + |> String.to_bytes in + Client.create_post node (payload, None) |> Client.post node; let%lwt () = - while%lwt Node.Testing.disseminator_round node <= 10 do + while%lwt PNode.Testing.disseminator_round node <= 10 do Lwt_unix.sleep 0.1 done in - Lwt.return (List.length (Node.Testing.broadcast_queue node)) + Lwt.return (List.length (PNode.Testing.broadcast_queue node)) let seen_message_test () = - let _server = Node.run_server ~msg_handler:Commons.msg_handler node in - let message = + let _server = PNode.run_server ~msg_handler:Commons.msg_handler node in + let payload = Client.address_of !node |> (fun Address.{ port; _ } -> port) |> string_of_int - |> String.to_bytes - |> Client.create_post node in + |> String.to_bytes in + let message = Client.create_post node (payload, None) in message |> Client.post node; - Lwt.return (Node.seen node message) + Lwt.return (PNode.seen node message) end (** Test for dissemination given a specific node. *) diff --git a/test/failure_detector_prop.ml b/test/failure_detector_prop.ml index 38f3c42..f7fc885 100644 --- a/test/failure_detector_prop.ml +++ b/test/failure_detector_prop.ml @@ -1,17 +1,17 @@ open QCheck2.Gen open Pollinate.Peer open Pollinate -module SUT = Pollinate.Node.Testing.Failure_detector +module SUT = Pollinate.PNode.Testing.Failure_detector let node_a = - Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3002 }) + Lwt_main.run (PNode.init Address.{ address = "127.0.0.1"; port = 3002 }) let update_peer = QCheck2.Test.make ~count:1000 ~name:"update_neighbor_status successfully update neighbor status" (pair Generators.peer_gen Generators.peer_status_gen) (fun (neighbor, neighbor_status) -> - let _ = add_neighbor (Pollinate.Node.Client.peer_from !node_a) neighbor in + let _ = add_neighbor (Pollinate.PNode.Client.peer_from !node_a) neighbor in let _ = SUT.update_peer_status node_a neighbor neighbor_status in neighbor.status = neighbor_status) diff --git a/test/failure_detector_tests.ml b/test/failure_detector_tests.ml index 9747cf7..54abe74 100644 --- a/test/failure_detector_tests.ml +++ b/test/failure_detector_tests.ml @@ -1,13 +1,13 @@ open Pollinate -open Pollinate.Node +open Pollinate.PNode open Lwt.Infix -module SUT = Pollinate.Node.Testing.Failure_detector +module SUT = Pollinate.PNode.Testing.Failure_detector let node_a = - Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3003 }) + Lwt_main.run (PNode.init Address.{ address = "127.0.0.1"; port = 3003 }) let node_b = - Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3004 }) + Lwt_main.run (PNode.init Address.{ address = "127.0.0.1"; port = 3004 }) let peer_b = Client.peer_from !node_b @@ -15,7 +15,7 @@ let failure_detection () = let open Common.Peer in let open Client in let _ = add_peer !node_a peer_b in - let _ = Pollinate.Node.Client.peer_from !node_a in + let _ = Pollinate.PNode.Client.peer_from !node_a in let _ = SUT.update_peer_status node_a peer_b Suspicious in (* Need to wait for the timeout to be reached An other way to do, would be to change the `last_suspicious_status` of the peer *) let%lwt _ = Lwt_unix.sleep 9.1 in @@ -24,7 +24,7 @@ let failure_detection () = let failure_detection_nothing_on_alive () = let open Common.Peer in - let _ = add_neighbor (Pollinate.Node.Client.peer_from !node_a) peer_b in + let _ = add_neighbor (Pollinate.PNode.Client.peer_from !node_a) peer_b in let _ = SUT.update_peer_status node_a peer_b Alive in let%lwt _ = SUT.failure_detection node_a in Lwt.return (Base.Hashtbl.length !node_a.peers = 1) diff --git a/test/gossip_tests.ml b/test/gossip_tests.ml index 166a2d9..56c77bd 100644 --- a/test/gossip_tests.ml +++ b/test/gossip_tests.ml @@ -1,7 +1,7 @@ open Lwt.Infix open Commons open Pollinate -open Pollinate.Node +open Pollinate.PNode module Gossip_tests = struct let local_address port = Address.{ address = "127.0.0.1"; port } @@ -41,16 +41,19 @@ module Gossip_tests = struct local_address 4009 ) in let%lwt node_a = - Node.init ~init_peers:[addr_b; addr_c; addr_e; addr_h] addr_a in - let%lwt node_b = Node.init ~init_peers:[addr_a; addr_d; addr_e] addr_b in - let%lwt node_c = Node.init ~init_peers:[addr_a; addr_f; addr_g] addr_c in - let%lwt node_d = Node.init ~init_peers:[addr_b] addr_d in - let%lwt node_e = Node.init ~init_peers:[addr_a; addr_b] addr_e in - let%lwt node_f = Node.init ~init_peers:[addr_c] addr_f in - let%lwt node_g = Node.init ~init_peers:[addr_c] addr_g in - let%lwt node_h = Node.init ~init_peers:[addr_a; addr_i; addr_j] addr_h in - let%lwt node_i = Node.init ~init_peers:[addr_h] addr_i in - let%lwt node_j = Node.init ~init_peers:[addr_h] addr_j in + PNode.init ~init_peers:[addr_b; addr_c; addr_e; addr_h] addr_a in + let%lwt node_b = + PNode.init ~init_peers:[addr_a; addr_d; addr_e] addr_b in + let%lwt node_c = + PNode.init ~init_peers:[addr_a; addr_f; addr_g] addr_c in + let%lwt node_d = PNode.init ~init_peers:[addr_b] addr_d in + let%lwt node_e = PNode.init ~init_peers:[addr_a; addr_b] addr_e in + let%lwt node_f = PNode.init ~init_peers:[addr_c] addr_f in + let%lwt node_g = PNode.init ~init_peers:[addr_c] addr_g in + let%lwt node_h = + PNode.init ~init_peers:[addr_a; addr_i; addr_j] addr_h in + let%lwt node_i = PNode.init ~init_peers:[addr_h] addr_i in + let%lwt node_j = PNode.init ~init_peers:[addr_h] addr_j in Lwt.return ( node_a, node_b, @@ -96,19 +99,19 @@ module Gossip_tests = struct let disseminate_from _n node = let _ = List.map - (Node.run_server ~preprocessor:Commons.preprocessor + (PNode.run_server ~preprocessor:Commons.preprocessor ~msg_handler:Commons.msg_handler) nodes in - let message = + let payload = Client.address_of !node |> (fun Address.{ port; _ } -> port) |> string_of_int - |> String.to_bytes - |> Client.create_post node in + |> String.to_bytes in + let message = Client.create_post node (payload, None) in Client.post node message; let seen () = - nodes |> List.filter (fun n -> Node.seen n message) |> node_ports in + nodes |> List.filter (fun n -> PNode.seen n message) |> node_ports in let%lwt () = let%lwt oc = diff --git a/test/networking_prop.ml b/test/networking_prop.ml index 863a9ef..b5e1115 100644 --- a/test/networking_prop.ml +++ b/test/networking_prop.ml @@ -2,10 +2,10 @@ open QCheck2.Gen open Pollinate.Peer open Pollinate -module SUT = Pollinate.Node.Testing.Networking +module SUT = Pollinate.PNode.Testing.Networking let node_a = - Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 2002 }) + Lwt_main.run (PNode.init Address.{ address = "127.0.0.1"; port = 2002 }) let knuth_shuffle_size = QCheck2.Test.make ~count:1000 diff --git a/test/node_tests.ml b/test/node_tests.ml index 84a17c7..fc56359 100644 --- a/test/node_tests.ml +++ b/test/node_tests.ml @@ -1,19 +1,19 @@ open Lwt.Infix open Commons open Pollinate -open Pollinate.Node +open Pollinate.PNode open Pollinate.Util open Messages module Node_tests = struct (* Initializes two nodes and the related two peers *) let node_a = - Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3000 }) + Lwt_main.run (PNode.init Address.{ address = "127.0.0.1"; port = 3000 }) let peer_a = Client.peer_from !node_a let node_b = - Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3001 }) + Lwt_main.run (PNode.init Address.{ address = "127.0.0.1"; port = 3001 }) let peer_b = Client.peer_from !node_b @@ -23,45 +23,41 @@ module Node_tests = struct let open Messages in let _ = Lwt_list.map_p - (Node.run_server ~preprocessor:Commons.preprocessor + (PNode.run_server ~preprocessor:Commons.preprocessor ~msg_handler:Commons.msg_handler) [node_a; node_b] in let get = Encoding.pack bin_writer_message (Request Get) in let%lwt { payload = res_from_b; _ } = - Client.request node_a get peer_b.address in + Client.request node_a (get, None) peer_b.address in let res_from_b = Encoding.unpack bin_read_response res_from_b in - let%lwt { payload = res_from_a; _ } = - Client.request node_b get peer_a.address in + Client.request node_b (get, None) peer_a.address in let res_from_a = Encoding.unpack bin_read_response res_from_a in - let res_from_b, res_from_a = match (res_from_b, res_from_a) with - | Pong, Pong -> ("Ok", "Ok") + | Success "Ok", Success "Ok" -> ("Ok", "Ok") | _ -> failwith "Incorrect response" in - Lwt.return (res_from_b, res_from_a) let ping_pong () = let open Messages in let _ = Lwt_list.map_p - (Node.run_server ~preprocessor:Commons.preprocessor + (PNode.run_server ~preprocessor:Commons.preprocessor ~msg_handler:Commons.msg_handler) [node_a; node_b] in let ping = Encoding.pack bin_writer_message (Request Ping) in - let%lwt { payload = pong; _ } = Client.request node_a ping peer_b.address in + let%lwt { payload = pong; _ } = + Client.request node_a (ping, None) peer_b.address in let pong = Encoding.unpack bin_read_response pong in - let pong = match pong with | Pong -> show_response Pong | _ -> failwith (Printf.sprintf "Incorrect response: %s" (show_response pong)) in - Lwt.return pong end