From c0bcd5d473b40e8678d8eca9c5e04781c878c4f8 Mon Sep 17 00:00:00 2001 From: Gauthier SEBILLE Date: Mon, 4 Jul 2022 18:35:37 +0200 Subject: [PATCH 01/17] Re-add payload_signature --- lib/node/client.ml | 10 +++++++--- lib/node/client.mli | 8 ++++---- lib/node/failure_detector.ml | 1 + lib/node/message.ml | 1 + lib/node/message.mli | 1 + lib/node/node.mli | 2 +- lib/node/server.ml | 13 ++++++++----- lib/node/server.mli | 2 +- test/commons.ml | 5 +++-- test/disseminator_tests.ml | 30 +++++++++++++++--------------- test/gossip_tests.ml | 7 ++++--- test/node_tests.ml | 6 +++--- 12 files changed, 49 insertions(+), 37 deletions(-) diff --git a/lib/node/client.ml b/lib/node/client.ml index 71fa904..c232d29 100644 --- a/lib/node/client.ml +++ b/lib/node/client.ml @@ -21,7 +21,7 @@ let add_peer_as_is node (peer : Peer.t) = let peers node = Base.Hashtbl.keys node.peers -let create_request node ?(request_ack = false) recipient payload = +let create_request node ?(request_ack = false) recipient (payload, payload_signature) = Mutex.with_lock !node.current_request_id (fun id -> id := !id + 1; Lwt.return @@ -35,9 +35,10 @@ let create_request node ?(request_ack = false) recipient payload = sender = !node.address; recipients = [recipient]; payload; + payload_signature; }) -let create_response node ?(request_ack = false) request payload = +let create_response node ?(request_ack = false) request (payload, payload_signature) = Message. { category = Message.Response; @@ -48,9 +49,10 @@ let create_response node ?(request_ack = false) request payload = sender = !node.address; recipients = [request.sender]; payload; + payload_signature; } -let create_post node ?(request_ack = false) payload = +let create_post node ?(request_ack = false) (payload, payload_signature) = Message. { category = Message.Post; @@ -61,6 +63,7 @@ let create_post node ?(request_ack = false) payload = sender = !node.address; recipients = []; payload; + payload_signature; } let create_ack node incoming_message = @@ -74,6 +77,7 @@ let create_ack node incoming_message = sender = !node.address; recipients = [incoming_message.sender]; payload = incoming_message |> Message.hash_of |> Bytes.of_string; + payload_signature = None; } let request node request recipient = diff --git a/lib/node/client.mli b/lib/node/client.mli index eb4c2ce..3cdf674 100644 --- a/lib/node/client.mli +++ b/lib/node/client.mli @@ -27,22 +27,22 @@ val post : node ref -> Message.t -> unit (** [create_request node recipient payload] creates a [Message.t] of the {i Request category} addressed to {i recipient} containing {i payload}. *) val create_request : - node ref -> ?request_ack:bool -> Address.t -> bytes -> Message.t Lwt.t + node ref -> ?request_ack:bool -> Address.t -> bytes * bytes option -> Message.t Lwt.t (** [create_response node request payload] creates a [Message.t] of the {i Response category} that responds to {i request} whose content is {i payload}. *) val create_response : - node ref -> ?request_ack:bool -> Message.t -> bytes -> Message.t + node ref -> ?request_ack:bool -> Message.t -> bytes * bytes option -> Message.t (** Sends an encoded {i request} to the specified peer and returns a promise holding the response from the peer. This function blocks the current thread of execution until a response arrives. *) -val request : node ref -> bytes -> Address.t -> Message.t Lwt.t +val request : node ref -> bytes * bytes option -> Address.t -> Message.t Lwt.t (** [create_post node payload] creates a [Message.t] of the {i Post category} containing {i payload} for eventual gossip dissemination across the entire network. *) -val create_post : node ref -> ?request_ack:bool -> bytes -> Message.t +val create_post : node ref -> ?request_ack:bool -> bytes * bytes option -> Message.t val create_ack : node ref -> Message.t -> Message.t diff --git a/lib/node/failure_detector.ml b/lib/node/failure_detector.ml index 43f0cae..8033a62 100644 --- a/lib/node/failure_detector.ml +++ b/lib/node/failure_detector.ml @@ -65,6 +65,7 @@ let create_message node message recipient = sender = Client.address_of !node; recipients = [recipient.Peer.address]; payload = Encoding.pack bin_writer_message message; + payload_signature = None; } let send_message message node recipient = diff --git a/lib/node/message.ml b/lib/node/message.ml index b451906..b19d059 100644 --- a/lib/node/message.ml +++ b/lib/node/message.ml @@ -20,6 +20,7 @@ type t = { sender : Address.t; recipients : Address.t list; payload : bytes; + payload_signature : bytes option; } [@@deriving bin_io] diff --git a/lib/node/message.mli b/lib/node/message.mli index 6f3be77..4e0ffc7 100644 --- a/lib/node/message.mli +++ b/lib/node/message.mli @@ -31,6 +31,7 @@ type t = { sender : Address.t; recipients : Address.t list; payload : bytes; + payload_signature : bytes option; } [@@deriving bin_io] diff --git a/lib/node/node.mli b/lib/node/node.mli index 03626c3..d1727f9 100644 --- a/lib/node/node.mli +++ b/lib/node/node.mli @@ -15,7 +15,7 @@ val init : ?init_peers:Address.t list -> Address.t -> t ref Lwt.t val run_server : ?preprocessor:(Message.t -> Message.t) -> - msg_handler:(Message.t -> bytes option) -> + msg_handler:(Message.t -> bytes option * bytes option) -> t ref -> 'b Lwt.t diff --git a/lib/node/server.ml b/lib/node/server.ml index a5243bc..58f1f79 100644 --- a/lib/node/server.ml +++ b/lib/node/server.ml @@ -33,7 +33,7 @@ let handle_ack node msg = Otherwise, we just apply the message handler and that's it. *) -let process_message node preprocessor msg_handler = +let process_message node preprocessor (msg_handler: Message.t -> bytes option * bytes option ) = let open Message in let%lwt message = Networking.recv_next node in let message = preprocessor message in @@ -52,11 +52,14 @@ let process_message node preprocessor msg_handler = !node.address.address !node.address.port message.sender.address message.sender.port) in *) match msg_handler message with - | Some response -> - response + Some payload, payload_signature -> + (* let _ = Printf.sprintf "I am inside msg_handler, found payload\n%!" in *) + (payload, payload_signature) |> Client.create_response node message |> Networking.send_to node - | None -> Lwt.return ()) + | None, _ -> + (* let _ = Printf.sprintf "I am inside msg_handler, missing payload\n%!" in *) + Lwt.return ()) | Acknowledgment -> let msg_hash = Bytes.to_string message.payload in let new_addrs = @@ -128,7 +131,7 @@ let _print_logs node = 3. Run the disseminator, this includes actually sending messages to be disseminated across the network. 4. Wait 0.001 seconds before restarting the procedure. *) -let rec run node preprocessor msg_handler = +let rec run node preprocessor (msg_handler: Message.t -> bytes option * bytes option ) = (* Step 0 *) (* let%lwt () = print_logs node in *) (* Step 1 *) diff --git a/lib/node/server.mli b/lib/node/server.mli index 2cfa48c..17019e0 100644 --- a/lib/node/server.mli +++ b/lib/node/server.mli @@ -4,5 +4,5 @@ val run : Types.node ref -> (Message.t -> Message.t) -> - (Message.t -> bytes option) -> + (Message.t -> bytes option * bytes option) -> 'b Lwt.t diff --git a/test/commons.ml b/test/commons.ml index 3b083e5..9b8106d 100644 --- a/test/commons.ml +++ b/test/commons.ml @@ -28,6 +28,7 @@ module Commons = struct | Ping -> Pong | Get -> Pong | Insert _ -> Success "Successfully added value to state" in - Response response |> Encoding.pack bin_writer_message |> Option.some - | _ -> None + ( Response response |> Encoding.pack bin_writer_message |> Option.some, + None ) + | _ -> (None, None) end diff --git a/test/disseminator_tests.ml b/test/disseminator_tests.ml index 8b23ee2..d2a3b04 100644 --- a/test/disseminator_tests.ml +++ b/test/disseminator_tests.ml @@ -13,23 +13,23 @@ module Disseminator_tests = struct let queue_insertion_test () = let _server = Node.run_server ~msg_handler:Commons.msg_handler node in - Client.address_of !node - |> (fun Address.{ port; _ } -> port) - |> string_of_int - |> String.to_bytes - |> Client.create_post node - |> Client.post node; + let payload = + Client.address_of !node + |> (fun Address.{ port; _ } -> port) + |> string_of_int + |> String.to_bytes in + Client.create_post node (payload, None) |> Client.post node; Lwt.return (List.length (Node.Testing.broadcast_queue node)) let queue_removal_test () = let _server = Node.run_server ~msg_handler:Commons.msg_handler node in - Client.address_of !node - |> (fun Address.{ port; _ } -> port) - |> string_of_int - |> String.to_bytes - |> Client.create_post node - |> Client.post node; + let payload = + Client.address_of !node + |> (fun Address.{ port; _ } -> port) + |> string_of_int + |> String.to_bytes in + Client.create_post node (payload, None) |> Client.post node; let%lwt () = while%lwt Node.Testing.disseminator_round node <= 10 do @@ -39,12 +39,12 @@ module Disseminator_tests = struct let seen_message_test () = let _server = Node.run_server ~msg_handler:Commons.msg_handler node in - let message = + let payload = Client.address_of !node |> (fun Address.{ port; _ } -> port) |> string_of_int - |> String.to_bytes - |> Client.create_post node in + |> String.to_bytes in + let message = Client.create_post node (payload, None) in message |> Client.post node; Lwt.return (Node.seen node message) diff --git a/test/gossip_tests.ml b/test/gossip_tests.ml index c1fcb18..70e8454 100644 --- a/test/gossip_tests.ml +++ b/test/gossip_tests.ml @@ -101,12 +101,13 @@ module Gossip_tests = struct nodes in (* Create the message to be posted by node *) - let message = + let payload = Client.address_of !node |> (fun Address.{ port; _ } -> port) |> string_of_int - |> String.to_bytes - |> Client.create_post ~request_ack:true node in + |> String.to_bytes in + let message = Client.create_post node (payload, None) ~request_ack:true in + Client.post node message; (* Post the created message *) Client.post node message; diff --git a/test/node_tests.ml b/test/node_tests.ml index 84a17c7..ec71548 100644 --- a/test/node_tests.ml +++ b/test/node_tests.ml @@ -29,11 +29,11 @@ module Node_tests = struct let get = Encoding.pack bin_writer_message (Request Get) in let%lwt { payload = res_from_b; _ } = - Client.request node_a get peer_b.address in + Client.request node_a (get, None) peer_b.address in let res_from_b = Encoding.unpack bin_read_response res_from_b in let%lwt { payload = res_from_a; _ } = - Client.request node_b get peer_a.address in + Client.request node_b (get, None) peer_a.address in let res_from_a = Encoding.unpack bin_read_response res_from_a in let res_from_b, res_from_a = @@ -52,7 +52,7 @@ module Node_tests = struct [node_a; node_b] in let ping = Encoding.pack bin_writer_message (Request Ping) in - let%lwt { payload = pong; _ } = Client.request node_a ping peer_b.address in + let%lwt { payload = pong; _ } = Client.request node_a (ping, None) peer_b.address in let pong = Encoding.unpack bin_read_response pong in let pong = From 28aaf63c0d193f5809a78a5d45ff8b5177fa8d1c Mon Sep 17 00:00:00 2001 From: Gauthier SEBILLE Date: Tue, 5 Jul 2022 09:06:42 +0200 Subject: [PATCH 02/17] format --- .ocamlformat | 9 +++++---- lib/common/address.mli | 6 +++--- lib/common/mutex.mli | 14 +++++++------- lib/common/peer.mli | 16 ++++++++-------- lib/common/util.mli | 12 ++++++------ lib/node/client.ml | 6 ++++-- lib/node/client.mli | 33 +++++++++++++++++++++------------ lib/node/disseminator.ml | 2 +- lib/node/disseminator.mli | 16 ++++++++-------- lib/node/failure_detector.mli | 8 ++++---- lib/node/message.ml | 2 +- lib/node/message.mli | 6 +++--- lib/node/networking.mli | 10 +++++----- lib/node/node.ml | 3 ++- lib/node/node.mli | 5 +++-- lib/node/server.ml | 10 ++++++---- lib/node/server.mli | 6 +++--- lib/node/types.mli | 8 ++++---- test/address_prop.ml | 1 - test/generators.ml | 1 + test/messages.ml | 6 +++--- test/networking_prop.ml | 1 - test/node_tests.ml | 3 ++- test/peer_prop.ml | 1 - test/util_prop.ml | 1 - 25 files changed, 100 insertions(+), 86 deletions(-) diff --git a/.ocamlformat b/.ocamlformat index e768218..77cdc18 100644 --- a/.ocamlformat +++ b/.ocamlformat @@ -1,9 +1,11 @@ +# We're using a patched version of 0.20.1 provided +# by the ocaml-overlay in flake.nix + version = 0.21.0 + profile = conventional leading-nested-match-parens = false -align-constructors-decl = true -align-variants-decl = true space-around-variants = false space-around-arrays = false space-around-lists = false @@ -20,5 +22,4 @@ if-then-else = fit-or-vertical let-and = sparse type-decl = sparse sequence-blank-line=preserve-one -module-item-spacing=preserve -doc-comments=before +module-item-spacing=sparse diff --git a/lib/common/address.mli b/lib/common/address.mli index 8a137e4..52c0ba7 100644 --- a/lib/common/address.mli +++ b/lib/common/address.mli @@ -11,11 +11,11 @@ type t = { (** {1 API} *) -(** Create an [Address.t] from [address] as {i string} and [port] as {i int} *) val create : string -> int -> t +(** Create an [Address.t] from [address] as {i string} and [port] as {i int} *) -(** Obtain an [Address.t] from a [Unix.sockaddr] *) val from_sockaddr : Unix.sockaddr -> t +(** Obtain an [Address.t] from a [Unix.sockaddr] *) -(** Obtain a [Unix.sockaddr] from an [Address.t] *) val to_sockaddr : t -> Unix.sockaddr +(** Obtain a [Unix.sockaddr] from an [Address.t] *) diff --git a/lib/common/mutex.mli b/lib/common/mutex.mli index 67cea3e..1b41e4c 100644 --- a/lib/common/mutex.mli +++ b/lib/common/mutex.mli @@ -2,31 +2,31 @@ (** {1 Type} *) +type 'a t (** A wrapper for a value that needs to be protected by a mutex. *) -type 'a t (** {1 API} *) -(** Creates a mutex-wrapped value. *) val create : 'a -> 'a t +(** Creates a mutex-wrapped value. *) +val lock : 'a t -> 'a Lwt.t (** Locks the mutex protecting the value and returns a reference to the value so that it can be mutated. *) -val lock : 'a t -> 'a Lwt.t -(** Unlocks the mutex protecting the value. *) val unlock : 'a t -> unit +(** Unlocks the mutex protecting the value. *) -(** Checks if the mutex is locked. *) val is_locked : 'a t -> bool +(** Checks if the mutex is locked. *) +val with_lock : 'a t -> ('a -> 'b Lwt.t) -> 'b Lwt.t (** Locks the mutex, executes the given function on the value, unlocks the mutex, then returns the result of the function call. *) -val with_lock : 'a t -> ('a -> 'b Lwt.t) -> 'b Lwt.t +val unsafe : 'a t -> ('a -> 'b) -> 'b (** [unsafe t f] applies the function [f] to the value wrapped by [t] and returns the result {i without locking the mutex.} *) -val unsafe : 'a t -> ('a -> 'b) -> 'b diff --git a/lib/common/peer.mli b/lib/common/peer.mli index 9338855..c7ba3c4 100644 --- a/lib/common/peer.mli +++ b/lib/common/peer.mli @@ -13,32 +13,32 @@ type status = | Faulty [@@deriving show { with_path = false }, eq] -(** The type of a peer. [Neighbors] are represented -internally by a [Base.Hashtbl], so look-ups, insertions, -and removals are all approximately constant-time. *) type t = { address : Address.t; mutable status : status; mutable last_suspicious_status : float option; neighbors : (Address.t, t) Base.Hashtbl.t; } +(** The type of a peer. [Neighbors] are represented +internally by a [Base.Hashtbl], so look-ups, insertions, +and removals are all approximately constant-time. *) (** {1 API} *) +val from : Address.t -> t (** Constructs a [Peer.t] from an [Address.t]. This is the recommended way to create a Peer {i from scratch}. *) -val from : Address.t -> t -(** Constructs an [Address.t] from a [Unix.sockaddr] *) val from_socket_address : Unix.sockaddr -> t +(** Constructs an [Address.t] from a [Unix.sockaddr] *) -(** Adds a neighbor to the given peer's neighbors *) val add_neighbor : t -> t -> [`Duplicate | `Ok] +(** Adds a neighbor to the given peer's neighbors *) -(** Adds a list of neighbors to the given peer's neighbors *) val add_neighbors : t -> t list -> [`Duplicate | `Ok] list +(** Adds a list of neighbors to the given peer's neighbors *) +val get_neighbor : t -> Address.t -> t option (** Looks up a neighbor of the given peer by address and returns a [Peer.t option] containing the neighbor if this address is found. *) -val get_neighbor : t -> Address.t -> t option diff --git a/lib/common/util.mli b/lib/common/util.mli index c75b9b3..70bac59 100644 --- a/lib/common/util.mli +++ b/lib/common/util.mli @@ -10,33 +10,33 @@ val ( let* ) : 'a option -> ('a -> 'b option) -> 'b option module Encoding : sig (** Defines utilities for encoding or decoding messages. *) + val size_header_length : int (** The {i int} value of the necessary buffer size for storing the size header at the beginning of each Bin_prot payload. *) - val size_header_length : int + val read_size_header : bytes -> int (** Reads the value of the size header prepended to a serialized [Bin_prot] payload. *) - val read_size_header : bytes -> int + val pack : 'a Bin_prot.Type_class.writer -> 'a -> bytes (** Serializes a payload using a [Bin_prot writer] corresponding to its type. *) - val pack : 'a Bin_prot.Type_class.writer -> 'a -> bytes + val unpack : 'a Bin_prot.Read.reader -> bytes -> 'a (** Deserializes a payload using a [Bin_prot reader] corresponding to its type. The payload being deserialized {b MUST} have an 8 bytes size header, or this function will behave incorrectly. *) - val unpack : 'a Bin_prot.Read.reader -> bytes -> 'a + val str_dump : bytes -> string (** Produces a {i string} consisting of semi-colon (;) separated integer-representations of each byte in the input bytes. *) - val str_dump : bytes -> string end module Net : sig (** Defines utilities for working with UDP sockets *) + val create_socket : int -> Lwt_unix.file_descr Lwt.t (** Creates and binds a socket to [localhost:] where [port] is the lone argument to this function *) - val create_socket : int -> Lwt_unix.file_descr Lwt.t end diff --git a/lib/node/client.ml b/lib/node/client.ml index c232d29..74b398c 100644 --- a/lib/node/client.ml +++ b/lib/node/client.ml @@ -21,7 +21,8 @@ let add_peer_as_is node (peer : Peer.t) = let peers node = Base.Hashtbl.keys node.peers -let create_request node ?(request_ack = false) recipient (payload, payload_signature) = +let create_request node ?(request_ack = false) recipient + (payload, payload_signature) = Mutex.with_lock !node.current_request_id (fun id -> id := !id + 1; Lwt.return @@ -38,7 +39,8 @@ let create_request node ?(request_ack = false) recipient (payload, payload_signa payload_signature; }) -let create_response node ?(request_ack = false) request (payload, payload_signature) = +let create_response node ?(request_ack = false) request + (payload, payload_signature) = Message. { category = Message.Response; diff --git a/lib/node/client.mli b/lib/node/client.mli index 3cdf674..f31de0e 100644 --- a/lib/node/client.mli +++ b/lib/node/client.mli @@ -5,44 +5,53 @@ open Types (** {1 API} *) -(** Retrieve the [Address.t] of the given [Types.node]. *) val address_of : node -> Address.t +(** Retrieve the [Address.t] of the given [Types.node]. *) -(** Constructs a [Peer.t] from a [Types.node]. *) val peer_from : node -> Peer.t +(** Constructs a [Peer.t] from a [Types.node]. *) -(** Add a peer to the known peers by the peer's address. *) val add_peer : node -> Address.t -> [`Duplicate | `Ok] +(** Add a peer to the known peers by the peer's address. *) -(** Add a peer, along with all its existing state, to the known peers. *) val add_peer_as_is : node -> Peer.t -> [`Duplicate | `Ok] +(** Add a peer, along with all its existing state, to the known peers. *) -(** Get a list of addresses corresponding to peers of the given node. *) val peers : node -> Address.t list +(** Get a list of addresses corresponding to peers of the given node. *) +val post : node ref -> Message.t -> unit (** Begins disseminating an encoded message meant to be witnessed by the entire network. *) -val post : node ref -> Message.t -> unit +val create_request : + node ref -> + ?request_ack:bool -> + Address.t -> + bytes * bytes option -> + Message.t Lwt.t (** [create_request node recipient payload] creates a [Message.t] of the {i Request category} addressed to {i recipient} containing {i payload}. *) -val create_request : - node ref -> ?request_ack:bool -> Address.t -> bytes * bytes option -> Message.t Lwt.t +val create_response : + node ref -> + ?request_ack:bool -> + Message.t -> + bytes * bytes option -> + Message.t (** [create_response node request payload] creates a [Message.t] of the {i Response category} that responds to {i request} whose content is {i payload}. *) -val create_response : - node ref -> ?request_ack:bool -> Message.t -> bytes * bytes option -> Message.t +val request : node ref -> bytes * bytes option -> Address.t -> Message.t Lwt.t (** Sends an encoded {i request} to the specified peer and returns a promise holding the response from the peer. This function blocks the current thread of execution until a response arrives. *) -val request : node ref -> bytes * bytes option -> Address.t -> Message.t Lwt.t +val create_post : + node ref -> ?request_ack:bool -> bytes * bytes option -> Message.t (** [create_post node payload] creates a [Message.t] of the {i Post category} containing {i payload} for eventual gossip dissemination across the entire network. *) -val create_post : node ref -> ?request_ack:bool -> bytes * bytes option -> Message.t val create_ack : node ref -> Message.t -> Message.t diff --git a/lib/node/disseminator.ml b/lib/node/disseminator.ml index 9eb9152..45bb9e7 100644 --- a/lib/node/disseminator.ml +++ b/lib/node/disseminator.ml @@ -3,9 +3,9 @@ type pool_elt = { remaining : int; } +module DigestSet = Set.Make (Digest) (** Set of md5 message hashes in hex-string form for storing "seen" messages *) -module DigestSet = Set.Make (Digest) type t = { round : int; diff --git a/lib/node/disseminator.mli b/lib/node/disseminator.mli index bcea15e..ed85dee 100644 --- a/lib/node/disseminator.mli +++ b/lib/node/disseminator.mli @@ -1,44 +1,44 @@ (** Component responsible for gossip-style dissemination of messages across the network *) +type t (** A record containing information and state relevant to the dissemination component. *) -type t +val create : num_rounds:int -> epoch_length:float -> t (** Creates a dissemination component that can be attached to a node when given the number of "rounds" for which each new message should be disseminated and an "epoch length", in seconds, which determines whether a message is too old to be disseminated again by checking whether the message is newer than n seconds old, where n is the given epoch length. *) -val create : num_rounds:int -> epoch_length:float -> t +val next_round : t -> t (** Starts the next round of dissemination, affecting the state of the disseminator. In particular, this function causes the current round to increase, reduces the number of rounds remaining for each message being disseminated, and filters out messages with no rounds remaining or a timestamp that's older than epoch_length seconds. *) -val next_round : t -> t +val post : t -> Message.t -> t (** Adds a new message to the dissemination pool. The message will not be posted if it is older than the epoch_length. Otherwise, the message will begin to be disseminated automatically as long as the disseminator is running along with Networking.disseminate.*) -val post : t -> Message.t -> t +val broadcast_queue : t -> Message.t list (** Returns the list of messages that need to be disseminated. For exclusive use by Networking.disseminate. *) -val broadcast_queue : t -> Message.t list +val seen : t -> Message.t -> bool (** Determines whether the dissemination component has witnessed a given message before, or whether the message is too old to be retained in the set of seen messages. *) -val seen : t -> Message.t -> bool +val get_seen_messages : t -> string list (** Returns the 7 digit hashes of all the messages that the disseminator has seen. *) -val get_seen_messages : t -> string list -(** Returns the current disseminator round. *) val current_round : t -> int +(** Returns the current disseminator round. *) diff --git a/lib/node/failure_detector.mli b/lib/node/failure_detector.mli index 5d1914a..0740d84 100644 --- a/lib/node/failure_detector.mli +++ b/lib/node/failure_detector.mli @@ -20,23 +20,23 @@ type message = (** {1 Constructor} *) +val make : failure_detector_config -> failure_detector (** Initializes the failure detection component with a default state and given config. *) -val make : failure_detector_config -> failure_detector (** {1 Messaging} *) -(** Processes an incoming [Message.t] bound for the failure detector of a node. *) val handle_message : node ref -> Message.t -> unit Lwt.t +(** Processes an incoming [Message.t] bound for the failure detector of a node. *) (** {1 Detection functions} *) -(** Responsible for the calculation of the status of each node *) val suspicion_detection : node ref -> unit Lwt.t +(** Responsible for the calculation of the status of each node *) +val failure_detection : node ref -> unit Lwt.t (** If a peer is suspicious for more that failure_detector_config.suspicion_time it needs to be deleted from the list of knowns peers *) -val failure_detection : node ref -> unit Lwt.t (**/**) diff --git a/lib/node/message.ml b/lib/node/message.ml index b19d059..504f66c 100644 --- a/lib/node/message.ml +++ b/lib/node/message.ml @@ -8,7 +8,7 @@ type category = | Response | Post | Failure_detection - | Custom of string + | Custom of string [@@deriving bin_io, show] type t = { diff --git a/lib/node/message.mli b/lib/node/message.mli index 4e0ffc7..cf60ff6 100644 --- a/lib/node/message.mli +++ b/lib/node/message.mli @@ -17,11 +17,9 @@ type category = | Response | Post | Failure_detection - | Custom of string + | Custom of string [@@deriving bin_io, show] -(** Messages received from [peers] which are -processed by the node's message handler. *) type t = { category : category; sub_category_opt : (string * string) option; @@ -34,5 +32,7 @@ type t = { payload_signature : bytes option; } [@@deriving bin_io] +(** Messages received from [peers] which are +processed by the node's message handler. *) val hash_of : t -> Digest.t diff --git a/lib/node/networking.mli b/lib/node/networking.mli index e818c96..56cfb19 100644 --- a/lib/node/networking.mli +++ b/lib/node/networking.mli @@ -1,22 +1,22 @@ open Types open Common +val send_to : node ref -> Message.t -> unit Lwt.t (** Sends a message via datagram from the given [Types.node] to a specified peer within the [Message.t]. Construct a message with one of the [create_*] functions to then feed to this function. *) -val send_to : node ref -> Message.t -> unit Lwt.t -(** Waits for the next incoming message and returns it. *) val recv_next : node ref -> Message.t Lwt.t +(** Waits for the next incoming message and returns it. *) +val disseminate : node ref -> unit Lwt.t (** Advances a node's disseminator by disseminating the messages in the queue and pruning outdated messages from the queue. *) -val disseminate : node ref -> unit Lwt.t -(** Given a Base.Hashtbl of Addresses to Peers and a number n of peers to be - randomly chosen, returns a list of addresses corresponding to n *) val pick_random_neighbors : (Address.t, Peer.t) Base.Hashtbl.t -> int -> Address.t list +(** Given a Base.Hashtbl of Addresses to Peers and a number n of peers to be + randomly chosen, returns a list of addresses corresponding to n *) module Testing : sig val knuth_shuffle : 'a list -> 'a list diff --git a/lib/node/node.ml b/lib/node/node.ml index 33205f1..be2cd03 100644 --- a/lib/node/node.ml +++ b/lib/node/node.ml @@ -1,6 +1,5 @@ open Common open Types - module Message = Message module Client = Client @@ -45,6 +44,8 @@ module Testing = struct module AddressSet = Types.AddressSet module Failure_detector = Failure_detector module Networking = Networking + let broadcast_queue node = Disseminator.broadcast_queue !node.disseminator + let disseminator_round node = Disseminator.current_round !node.disseminator end diff --git a/lib/node/node.mli b/lib/node/node.mli index d1727f9..61aadd7 100644 --- a/lib/node/node.mli +++ b/lib/node/node.mli @@ -1,17 +1,16 @@ open Common - module Message = Message module Client = Client type t = Types.node +val init : ?init_peers:Address.t list -> Address.t -> t ref Lwt.t (** Initializes the node with an initial state, an optional preprocessing function that the consumer can use to inspect and modify the incoming message as well as its metadata, and a message handler that acts on the current state and the incoming Message.t. The message handler is used to initialize a server that runs asynchronously. Returns reference to the newly created node. *) -val init : ?init_peers:Address.t list -> Address.t -> t ref Lwt.t val run_server : ?preprocessor:(Message.t -> Message.t) -> @@ -25,6 +24,8 @@ module Testing : sig module AddressSet = Types.AddressSet module Failure_detector = Failure_detector module Networking = Networking + val broadcast_queue : t ref -> Message.t list + val disseminator_round : t ref -> int end diff --git a/lib/node/server.ml b/lib/node/server.ml index 58f1f79..d283754 100644 --- a/lib/node/server.ml +++ b/lib/node/server.ml @@ -33,7 +33,8 @@ let handle_ack node msg = Otherwise, we just apply the message handler and that's it. *) -let process_message node preprocessor (msg_handler: Message.t -> bytes option * bytes option ) = +let process_message node preprocessor + (msg_handler : Message.t -> bytes option * bytes option) = let open Message in let%lwt message = Networking.recv_next node in let message = preprocessor message in @@ -52,12 +53,12 @@ let process_message node preprocessor (msg_handler: Message.t -> bytes option * !node.address.address !node.address.port message.sender.address message.sender.port) in *) match msg_handler message with - Some payload, payload_signature -> + | Some payload, payload_signature -> (* let _ = Printf.sprintf "I am inside msg_handler, found payload\n%!" in *) (payload, payload_signature) |> Client.create_response node message |> Networking.send_to node - | None, _ -> + | None, _ -> (* let _ = Printf.sprintf "I am inside msg_handler, missing payload\n%!" in *) Lwt.return ()) | Acknowledgment -> @@ -131,7 +132,8 @@ let _print_logs node = 3. Run the disseminator, this includes actually sending messages to be disseminated across the network. 4. Wait 0.001 seconds before restarting the procedure. *) -let rec run node preprocessor (msg_handler: Message.t -> bytes option * bytes option ) = +let rec run node preprocessor + (msg_handler : Message.t -> bytes option * bytes option) = (* Step 0 *) (* let%lwt () = print_logs node in *) (* Step 1 *) diff --git a/lib/node/server.mli b/lib/node/server.mli index 17019e0..51e5b6b 100644 --- a/lib/node/server.mli +++ b/lib/node/server.mli @@ -1,8 +1,8 @@ -(** Runs the server given a reference to a node, a message preprocessor and a message handler. - The server is responsible for running failure detection and dissemination processes, as well - as issuing responses to nodes making individual requests via the message handler. *) val run : Types.node ref -> (Message.t -> Message.t) -> (Message.t -> bytes option * bytes option) -> 'b Lwt.t +(** Runs the server given a reference to a node, a message preprocessor and a message handler. + The server is responsible for running failure detection and dissemination processes, as well + as issuing responses to nodes making individual requests via the message handler. *) diff --git a/lib/node/types.mli b/lib/node/types.mli index 19b2663..1f52f13 100644 --- a/lib/node/types.mli +++ b/lib/node/types.mli @@ -3,8 +3,6 @@ open Lwt_unix (** {1 Types} *) -(** Configurable parameters that affect various aspects of the failure -detector *) type failure_detector_config = { (* The period of time within which peers may be randomly chosen to be pinged, and within which any peer who has been pinged must @@ -25,8 +23,9 @@ type failure_detector_config = { has failed to respond with acknowledgement during the round_trip_time. *) helpers_size : int; } +(** Configurable parameters that affect various aspects of the failure +detector *) -(** The state of a failure detection component. *) type failure_detector = { config : failure_detector_config; (* Table mapping sequence numbers to condition variables that get @@ -35,10 +34,10 @@ type failure_detector = { acknowledges : (int, unit Lwt_condition.t) Base.Hashtbl.t; mutable sequence_number : int; } +(** The state of a failure detection component. *) module AddressSet : Set.S with type elt = Address.t -(** Represents a node with some state in a peer-to-peer network *) type node = { address : Address.t; (* An ID that is incremented whenever a request is @@ -67,3 +66,4 @@ type node = { with other nodes in the network *) mutable disseminator : Disseminator.t; } +(** Represents a node with some state in a peer-to-peer network *) diff --git a/test/address_prop.ml b/test/address_prop.ml index eecd8f0..8301cc6 100644 --- a/test/address_prop.ml +++ b/test/address_prop.ml @@ -1,5 +1,4 @@ open QCheck2.Gen - module SUT = Pollinate.Address let create = diff --git a/test/generators.ml b/test/generators.ml index 043cf7d..820bbf1 100644 --- a/test/generators.ml +++ b/test/generators.ml @@ -1,6 +1,7 @@ open Messages open QCheck2.Gen open Pollinate + let address_gen = pair (pure "127.0.0.1") int >|= fun (address, port) -> Address.{ address; port } diff --git a/test/messages.ml b/test/messages.ml index b85b2d1..9f6c6d8 100644 --- a/test/messages.ml +++ b/test/messages.ml @@ -14,13 +14,13 @@ module Messages = struct type response = | Pong - | List of string list + | List of string list | Success of string - | Error of string + | Error of string [@@deriving bin_io, show { with_path = false }] type message = - | Request of request + | Request of request | Response of response [@@deriving bin_io, show { with_path = false }] end diff --git a/test/networking_prop.ml b/test/networking_prop.ml index 863a9ef..2bc018b 100644 --- a/test/networking_prop.ml +++ b/test/networking_prop.ml @@ -1,7 +1,6 @@ open QCheck2.Gen open Pollinate.Peer open Pollinate - module SUT = Pollinate.Node.Testing.Networking let node_a = diff --git a/test/node_tests.ml b/test/node_tests.ml index ec71548..140ff2f 100644 --- a/test/node_tests.ml +++ b/test/node_tests.ml @@ -52,7 +52,8 @@ module Node_tests = struct [node_a; node_b] in let ping = Encoding.pack bin_writer_message (Request Ping) in - let%lwt { payload = pong; _ } = Client.request node_a (ping, None) peer_b.address in + let%lwt { payload = pong; _ } = + Client.request node_a (ping, None) peer_b.address in let pong = Encoding.unpack bin_read_response pong in let pong = diff --git a/test/peer_prop.ml b/test/peer_prop.ml index 092311b..1d6895b 100644 --- a/test/peer_prop.ml +++ b/test/peer_prop.ml @@ -1,5 +1,4 @@ open QCheck2.Gen - module SUT = Pollinate.Peer let add_peer = diff --git a/test/util_prop.ml b/test/util_prop.ml index 7888c4a..53b4333 100644 --- a/test/util_prop.ml +++ b/test/util_prop.ml @@ -1,5 +1,4 @@ open Messages - module SUT = Pollinate.Util.Encoding let pack_unpack = From 0d1c6e37c138ce992027740635b54d424b805ec7 Mon Sep 17 00:00:00 2001 From: Gauthier SEBILLE Date: Tue, 5 Jul 2022 11:35:13 +0200 Subject: [PATCH 03/17] temp rework of FMT --- .ocamlformat | 7 +++++-- flake.nix | 2 +- lib/common/address.mli | 6 +++--- lib/common/mutex.mli | 14 +++++++------- lib/common/peer.mli | 16 ++++++++-------- lib/common/util.mli | 12 ++++++------ lib/node/client.mli | 26 +++++++++++++------------- lib/node/disseminator.ml | 2 +- lib/node/disseminator.mli | 16 ++++++++-------- lib/node/failure_detector.mli | 8 ++++---- lib/node/message.ml | 2 +- lib/node/message.mli | 6 +++--- lib/node/networking.mli | 10 +++++----- lib/node/node.mli | 2 +- lib/node/server.mli | 6 +++--- lib/node/types.mli | 26 +++++++++++++------------- test/messages.ml | 6 +++--- 17 files changed, 85 insertions(+), 82 deletions(-) diff --git a/.ocamlformat b/.ocamlformat index 77cdc18..8d03a4c 100644 --- a/.ocamlformat +++ b/.ocamlformat @@ -1,11 +1,13 @@ # We're using a patched version of 0.20.1 provided # by the ocaml-overlay in flake.nix -version = 0.21.0 +version = 0.20.1 profile = conventional leading-nested-match-parens = false +align-constructors-decl = true +align-variants-decl = true space-around-variants = false space-around-arrays = false space-around-lists = false @@ -22,4 +24,5 @@ if-then-else = fit-or-vertical let-and = sparse type-decl = sparse sequence-blank-line=preserve-one -module-item-spacing=sparse +module-item-spacing=preserve +doc-comments=before diff --git a/flake.nix b/flake.nix index e1c7c3c..1934bd4 100644 --- a/flake.nix +++ b/flake.nix @@ -33,7 +33,7 @@ buildInputs = with pkgs; with ocamlPackages_dev; [ ocaml-lsp - ocamlformat + ocamlformat_0_20_1 odoc ocaml dune_3 diff --git a/lib/common/address.mli b/lib/common/address.mli index 52c0ba7..8a137e4 100644 --- a/lib/common/address.mli +++ b/lib/common/address.mli @@ -11,11 +11,11 @@ type t = { (** {1 API} *) -val create : string -> int -> t (** Create an [Address.t] from [address] as {i string} and [port] as {i int} *) +val create : string -> int -> t -val from_sockaddr : Unix.sockaddr -> t (** Obtain an [Address.t] from a [Unix.sockaddr] *) +val from_sockaddr : Unix.sockaddr -> t -val to_sockaddr : t -> Unix.sockaddr (** Obtain a [Unix.sockaddr] from an [Address.t] *) +val to_sockaddr : t -> Unix.sockaddr diff --git a/lib/common/mutex.mli b/lib/common/mutex.mli index 1b41e4c..67cea3e 100644 --- a/lib/common/mutex.mli +++ b/lib/common/mutex.mli @@ -2,31 +2,31 @@ (** {1 Type} *) -type 'a t (** A wrapper for a value that needs to be protected by a mutex. *) +type 'a t (** {1 API} *) -val create : 'a -> 'a t (** Creates a mutex-wrapped value. *) +val create : 'a -> 'a t -val lock : 'a t -> 'a Lwt.t (** Locks the mutex protecting the value and returns a reference to the value so that it can be mutated. *) +val lock : 'a t -> 'a Lwt.t -val unlock : 'a t -> unit (** Unlocks the mutex protecting the value. *) +val unlock : 'a t -> unit -val is_locked : 'a t -> bool (** Checks if the mutex is locked. *) +val is_locked : 'a t -> bool -val with_lock : 'a t -> ('a -> 'b Lwt.t) -> 'b Lwt.t (** Locks the mutex, executes the given function on the value, unlocks the mutex, then returns the result of the function call. *) +val with_lock : 'a t -> ('a -> 'b Lwt.t) -> 'b Lwt.t -val unsafe : 'a t -> ('a -> 'b) -> 'b (** [unsafe t f] applies the function [f] to the value wrapped by [t] and returns the result {i without locking the mutex.} *) +val unsafe : 'a t -> ('a -> 'b) -> 'b diff --git a/lib/common/peer.mli b/lib/common/peer.mli index c7ba3c4..9338855 100644 --- a/lib/common/peer.mli +++ b/lib/common/peer.mli @@ -13,32 +13,32 @@ type status = | Faulty [@@deriving show { with_path = false }, eq] +(** The type of a peer. [Neighbors] are represented +internally by a [Base.Hashtbl], so look-ups, insertions, +and removals are all approximately constant-time. *) type t = { address : Address.t; mutable status : status; mutable last_suspicious_status : float option; neighbors : (Address.t, t) Base.Hashtbl.t; } -(** The type of a peer. [Neighbors] are represented -internally by a [Base.Hashtbl], so look-ups, insertions, -and removals are all approximately constant-time. *) (** {1 API} *) -val from : Address.t -> t (** Constructs a [Peer.t] from an [Address.t]. This is the recommended way to create a Peer {i from scratch}. *) +val from : Address.t -> t -val from_socket_address : Unix.sockaddr -> t (** Constructs an [Address.t] from a [Unix.sockaddr] *) +val from_socket_address : Unix.sockaddr -> t -val add_neighbor : t -> t -> [`Duplicate | `Ok] (** Adds a neighbor to the given peer's neighbors *) +val add_neighbor : t -> t -> [`Duplicate | `Ok] -val add_neighbors : t -> t list -> [`Duplicate | `Ok] list (** Adds a list of neighbors to the given peer's neighbors *) +val add_neighbors : t -> t list -> [`Duplicate | `Ok] list -val get_neighbor : t -> Address.t -> t option (** Looks up a neighbor of the given peer by address and returns a [Peer.t option] containing the neighbor if this address is found. *) +val get_neighbor : t -> Address.t -> t option diff --git a/lib/common/util.mli b/lib/common/util.mli index 70bac59..c75b9b3 100644 --- a/lib/common/util.mli +++ b/lib/common/util.mli @@ -10,33 +10,33 @@ val ( let* ) : 'a option -> ('a -> 'b option) -> 'b option module Encoding : sig (** Defines utilities for encoding or decoding messages. *) - val size_header_length : int (** The {i int} value of the necessary buffer size for storing the size header at the beginning of each Bin_prot payload. *) + val size_header_length : int - val read_size_header : bytes -> int (** Reads the value of the size header prepended to a serialized [Bin_prot] payload. *) + val read_size_header : bytes -> int - val pack : 'a Bin_prot.Type_class.writer -> 'a -> bytes (** Serializes a payload using a [Bin_prot writer] corresponding to its type. *) + val pack : 'a Bin_prot.Type_class.writer -> 'a -> bytes - val unpack : 'a Bin_prot.Read.reader -> bytes -> 'a (** Deserializes a payload using a [Bin_prot reader] corresponding to its type. The payload being deserialized {b MUST} have an 8 bytes size header, or this function will behave incorrectly. *) + val unpack : 'a Bin_prot.Read.reader -> bytes -> 'a - val str_dump : bytes -> string (** Produces a {i string} consisting of semi-colon (;) separated integer-representations of each byte in the input bytes. *) + val str_dump : bytes -> string end module Net : sig (** Defines utilities for working with UDP sockets *) - val create_socket : int -> Lwt_unix.file_descr Lwt.t (** Creates and binds a socket to [localhost:] where [port] is the lone argument to this function *) + val create_socket : int -> Lwt_unix.file_descr Lwt.t end diff --git a/lib/node/client.mli b/lib/node/client.mli index f31de0e..425da90 100644 --- a/lib/node/client.mli +++ b/lib/node/client.mli @@ -5,53 +5,53 @@ open Types (** {1 API} *) -val address_of : node -> Address.t (** Retrieve the [Address.t] of the given [Types.node]. *) +val address_of : node -> Address.t -val peer_from : node -> Peer.t (** Constructs a [Peer.t] from a [Types.node]. *) +val peer_from : node -> Peer.t -val add_peer : node -> Address.t -> [`Duplicate | `Ok] (** Add a peer to the known peers by the peer's address. *) +val add_peer : node -> Address.t -> [`Duplicate | `Ok] -val add_peer_as_is : node -> Peer.t -> [`Duplicate | `Ok] (** Add a peer, along with all its existing state, to the known peers. *) +val add_peer_as_is : node -> Peer.t -> [`Duplicate | `Ok] -val peers : node -> Address.t list (** Get a list of addresses corresponding to peers of the given node. *) +val peers : node -> Address.t list -val post : node ref -> Message.t -> unit (** Begins disseminating an encoded message meant to be witnessed by the entire network. *) +val post : node ref -> Message.t -> unit +(** [create_request node recipient payload] creates a [Message.t] of the {i Request category} +addressed to {i recipient} containing {i payload}. *) val create_request : node ref -> ?request_ack:bool -> Address.t -> bytes * bytes option -> Message.t Lwt.t -(** [create_request node recipient payload] creates a [Message.t] of the {i Request category} -addressed to {i recipient} containing {i payload}. *) +(** [create_response node request payload] creates a [Message.t] of the {i Response category} +that responds to {i request} whose content is {i payload}. *) val create_response : node ref -> ?request_ack:bool -> Message.t -> bytes * bytes option -> Message.t -(** [create_response node request payload] creates a [Message.t] of the {i Response category} -that responds to {i request} whose content is {i payload}. *) -val request : node ref -> bytes * bytes option -> Address.t -> Message.t Lwt.t (** Sends an encoded {i request} to the specified peer and returns a promise holding the response from the peer. This function blocks the current thread of execution until a response arrives. *) +val request : node ref -> bytes * bytes option -> Address.t -> Message.t Lwt.t -val create_post : - node ref -> ?request_ack:bool -> bytes * bytes option -> Message.t (** [create_post node payload] creates a [Message.t] of the {i Post category} containing {i payload} for eventual gossip dissemination across the entire network. *) +val create_post : + node ref -> ?request_ack:bool -> bytes * bytes option -> Message.t val create_ack : node ref -> Message.t -> Message.t diff --git a/lib/node/disseminator.ml b/lib/node/disseminator.ml index 45bb9e7..9eb9152 100644 --- a/lib/node/disseminator.ml +++ b/lib/node/disseminator.ml @@ -3,9 +3,9 @@ type pool_elt = { remaining : int; } -module DigestSet = Set.Make (Digest) (** Set of md5 message hashes in hex-string form for storing "seen" messages *) +module DigestSet = Set.Make (Digest) type t = { round : int; diff --git a/lib/node/disseminator.mli b/lib/node/disseminator.mli index ed85dee..bcea15e 100644 --- a/lib/node/disseminator.mli +++ b/lib/node/disseminator.mli @@ -1,44 +1,44 @@ (** Component responsible for gossip-style dissemination of messages across the network *) -type t (** A record containing information and state relevant to the dissemination component. *) +type t -val create : num_rounds:int -> epoch_length:float -> t (** Creates a dissemination component that can be attached to a node when given the number of "rounds" for which each new message should be disseminated and an "epoch length", in seconds, which determines whether a message is too old to be disseminated again by checking whether the message is newer than n seconds old, where n is the given epoch length. *) +val create : num_rounds:int -> epoch_length:float -> t -val next_round : t -> t (** Starts the next round of dissemination, affecting the state of the disseminator. In particular, this function causes the current round to increase, reduces the number of rounds remaining for each message being disseminated, and filters out messages with no rounds remaining or a timestamp that's older than epoch_length seconds. *) +val next_round : t -> t -val post : t -> Message.t -> t (** Adds a new message to the dissemination pool. The message will not be posted if it is older than the epoch_length. Otherwise, the message will begin to be disseminated automatically as long as the disseminator is running along with Networking.disseminate.*) +val post : t -> Message.t -> t -val broadcast_queue : t -> Message.t list (** Returns the list of messages that need to be disseminated. For exclusive use by Networking.disseminate. *) +val broadcast_queue : t -> Message.t list -val seen : t -> Message.t -> bool (** Determines whether the dissemination component has witnessed a given message before, or whether the message is too old to be retained in the set of seen messages. *) +val seen : t -> Message.t -> bool -val get_seen_messages : t -> string list (** Returns the 7 digit hashes of all the messages that the disseminator has seen. *) +val get_seen_messages : t -> string list -val current_round : t -> int (** Returns the current disseminator round. *) +val current_round : t -> int diff --git a/lib/node/failure_detector.mli b/lib/node/failure_detector.mli index 0740d84..5d1914a 100644 --- a/lib/node/failure_detector.mli +++ b/lib/node/failure_detector.mli @@ -20,23 +20,23 @@ type message = (** {1 Constructor} *) -val make : failure_detector_config -> failure_detector (** Initializes the failure detection component with a default state and given config. *) +val make : failure_detector_config -> failure_detector (** {1 Messaging} *) -val handle_message : node ref -> Message.t -> unit Lwt.t (** Processes an incoming [Message.t] bound for the failure detector of a node. *) +val handle_message : node ref -> Message.t -> unit Lwt.t (** {1 Detection functions} *) -val suspicion_detection : node ref -> unit Lwt.t (** Responsible for the calculation of the status of each node *) +val suspicion_detection : node ref -> unit Lwt.t -val failure_detection : node ref -> unit Lwt.t (** If a peer is suspicious for more that failure_detector_config.suspicion_time it needs to be deleted from the list of knowns peers *) +val failure_detection : node ref -> unit Lwt.t (**/**) diff --git a/lib/node/message.ml b/lib/node/message.ml index 504f66c..b19d059 100644 --- a/lib/node/message.ml +++ b/lib/node/message.ml @@ -8,7 +8,7 @@ type category = | Response | Post | Failure_detection - | Custom of string + | Custom of string [@@deriving bin_io, show] type t = { diff --git a/lib/node/message.mli b/lib/node/message.mli index cf60ff6..4e0ffc7 100644 --- a/lib/node/message.mli +++ b/lib/node/message.mli @@ -17,9 +17,11 @@ type category = | Response | Post | Failure_detection - | Custom of string + | Custom of string [@@deriving bin_io, show] +(** Messages received from [peers] which are +processed by the node's message handler. *) type t = { category : category; sub_category_opt : (string * string) option; @@ -32,7 +34,5 @@ type t = { payload_signature : bytes option; } [@@deriving bin_io] -(** Messages received from [peers] which are -processed by the node's message handler. *) val hash_of : t -> Digest.t diff --git a/lib/node/networking.mli b/lib/node/networking.mli index 56cfb19..e818c96 100644 --- a/lib/node/networking.mli +++ b/lib/node/networking.mli @@ -1,22 +1,22 @@ open Types open Common -val send_to : node ref -> Message.t -> unit Lwt.t (** Sends a message via datagram from the given [Types.node] to a specified peer within the [Message.t]. Construct a message with one of the [create_*] functions to then feed to this function. *) +val send_to : node ref -> Message.t -> unit Lwt.t -val recv_next : node ref -> Message.t Lwt.t (** Waits for the next incoming message and returns it. *) +val recv_next : node ref -> Message.t Lwt.t -val disseminate : node ref -> unit Lwt.t (** Advances a node's disseminator by disseminating the messages in the queue and pruning outdated messages from the queue. *) +val disseminate : node ref -> unit Lwt.t -val pick_random_neighbors : - (Address.t, Peer.t) Base.Hashtbl.t -> int -> Address.t list (** Given a Base.Hashtbl of Addresses to Peers and a number n of peers to be randomly chosen, returns a list of addresses corresponding to n *) +val pick_random_neighbors : + (Address.t, Peer.t) Base.Hashtbl.t -> int -> Address.t list module Testing : sig val knuth_shuffle : 'a list -> 'a list diff --git a/lib/node/node.mli b/lib/node/node.mli index 61aadd7..5d345e0 100644 --- a/lib/node/node.mli +++ b/lib/node/node.mli @@ -4,13 +4,13 @@ module Client = Client type t = Types.node -val init : ?init_peers:Address.t list -> Address.t -> t ref Lwt.t (** Initializes the node with an initial state, an optional preprocessing function that the consumer can use to inspect and modify the incoming message as well as its metadata, and a message handler that acts on the current state and the incoming Message.t. The message handler is used to initialize a server that runs asynchronously. Returns reference to the newly created node. *) +val init : ?init_peers:Address.t list -> Address.t -> t ref Lwt.t val run_server : ?preprocessor:(Message.t -> Message.t) -> diff --git a/lib/node/server.mli b/lib/node/server.mli index 51e5b6b..17019e0 100644 --- a/lib/node/server.mli +++ b/lib/node/server.mli @@ -1,8 +1,8 @@ +(** Runs the server given a reference to a node, a message preprocessor and a message handler. + The server is responsible for running failure detection and dissemination processes, as well + as issuing responses to nodes making individual requests via the message handler. *) val run : Types.node ref -> (Message.t -> Message.t) -> (Message.t -> bytes option * bytes option) -> 'b Lwt.t -(** Runs the server given a reference to a node, a message preprocessor and a message handler. - The server is responsible for running failure detection and dissemination processes, as well - as issuing responses to nodes making individual requests via the message handler. *) diff --git a/lib/node/types.mli b/lib/node/types.mli index 1f52f13..afc82a9 100644 --- a/lib/node/types.mli +++ b/lib/node/types.mli @@ -3,6 +3,8 @@ open Lwt_unix (** {1 Types} *) +(** Configurable parameters that affect various aspects of the failure +detector *) type failure_detector_config = { (* The period of time within which peers may be randomly chosen to be pinged, and within which any peer who has been pinged must @@ -23,9 +25,8 @@ type failure_detector_config = { has failed to respond with acknowledgement during the round_trip_time. *) helpers_size : int; } -(** Configurable parameters that affect various aspects of the failure -detector *) +(** The state of a failure detection component. *) type failure_detector = { config : failure_detector_config; (* Table mapping sequence numbers to condition variables that get @@ -34,22 +35,22 @@ type failure_detector = { acknowledges : (int, unit Lwt_condition.t) Base.Hashtbl.t; mutable sequence_number : int; } -(** The state of a failure detection component. *) module AddressSet : Set.S with type elt = Address.t +(** Represents a node with some state in a peer-to-peer network *) type node = { address : Address.t; - (* An ID that is incremented whenever a request is - made from this node. The response matching this - request will carry the same ID, allowing the response - to be identified and thus stopping the request from - blocking. *) + (* An ID that is incremented whenever a request is + made from this node. The response matching this + request will carry the same ID, allowing the response + to be identified and thus stopping the request from + blocking. *) current_request_id : int ref Mutex.t; - (* A hashtable that pairs request IDs with condition variables. - When a response is received by the server, it checks this table - for a waiting request and signals the request's condition variable - with the incoming response. *) + (* A hashtable that pairs request IDs with condition variables. + When a response is received by the server, it checks this table + for a waiting request and signals the request's condition variable + with the incoming response. *) request_table : (int, Message.t Lwt_condition.t) Hashtbl.t; socket : file_descr Mutex.t; (* Failure detection component ; runs automatically with the server and is responsible @@ -66,4 +67,3 @@ type node = { with other nodes in the network *) mutable disseminator : Disseminator.t; } -(** Represents a node with some state in a peer-to-peer network *) diff --git a/test/messages.ml b/test/messages.ml index 9f6c6d8..b85b2d1 100644 --- a/test/messages.ml +++ b/test/messages.ml @@ -14,13 +14,13 @@ module Messages = struct type response = | Pong - | List of string list + | List of string list | Success of string - | Error of string + | Error of string [@@deriving bin_io, show { with_path = false }] type message = - | Request of request + | Request of request | Response of response [@@deriving bin_io, show { with_path = false }] end From bc9e01c21634029bbf30ee3ff8057a52b22779d0 Mon Sep 17 00:00:00 2001 From: Gauthier SEBILLE Date: Tue, 5 Jul 2022 18:39:46 +0200 Subject: [PATCH 04/17] quick and dirty rename of Node package --- lib/dune | 2 +- lib/{node => pnode}/client.ml | 0 lib/{node => pnode}/client.mli | 0 lib/{node => pnode}/disseminator.ml | 0 lib/{node => pnode}/disseminator.mli | 0 lib/{node => pnode}/dune | 2 +- lib/{node => pnode}/failure_detector.ml | 0 lib/{node => pnode}/failure_detector.mli | 0 lib/{node => pnode}/message.ml | 0 lib/{node => pnode}/message.mli | 0 lib/{node => pnode}/networking.ml | 0 lib/{node => pnode}/networking.mli | 0 lib/{node => pnode}/node.ml | 0 lib/{node => pnode}/node.mli | 0 lib/{node => pnode}/server.ml | 0 lib/{node => pnode}/server.mli | 0 lib/{node => pnode}/types.ml | 0 lib/{node => pnode}/types.mli | 0 lib/{node => pnode}/util.ml | 0 lib/pollinate.ml | 2 +- test/commons.ml | 2 +- test/disseminator_tests.ml | 2 +- test/failure_detector_prop.ml | 4 ++-- test/failure_detector_tests.ml | 8 ++++---- test/gossip_tests.ml | 2 +- test/networking_prop.ml | 2 +- test/node_tests.ml | 2 +- 27 files changed, 14 insertions(+), 14 deletions(-) rename lib/{node => pnode}/client.ml (100%) rename lib/{node => pnode}/client.mli (100%) rename lib/{node => pnode}/disseminator.ml (100%) rename lib/{node => pnode}/disseminator.mli (100%) rename lib/{node => pnode}/dune (81%) rename lib/{node => pnode}/failure_detector.ml (100%) rename lib/{node => pnode}/failure_detector.mli (100%) rename lib/{node => pnode}/message.ml (100%) rename lib/{node => pnode}/message.mli (100%) rename lib/{node => pnode}/networking.ml (100%) rename lib/{node => pnode}/networking.mli (100%) rename lib/{node => pnode}/node.ml (100%) rename lib/{node => pnode}/node.mli (100%) rename lib/{node => pnode}/server.ml (100%) rename lib/{node => pnode}/server.mli (100%) rename lib/{node => pnode}/types.ml (100%) rename lib/{node => pnode}/types.mli (100%) rename lib/{node => pnode}/util.ml (100%) diff --git a/lib/dune b/lib/dune index f5c1898..71009f1 100644 --- a/lib/dune +++ b/lib/dune @@ -1,4 +1,4 @@ (library (name pollinate) (public_name pollinate) - (libraries pollinate.common pollinate.node)) + (libraries pollinate.common pollinate.pnode)) diff --git a/lib/node/client.ml b/lib/pnode/client.ml similarity index 100% rename from lib/node/client.ml rename to lib/pnode/client.ml diff --git a/lib/node/client.mli b/lib/pnode/client.mli similarity index 100% rename from lib/node/client.mli rename to lib/pnode/client.mli diff --git a/lib/node/disseminator.ml b/lib/pnode/disseminator.ml similarity index 100% rename from lib/node/disseminator.ml rename to lib/pnode/disseminator.ml diff --git a/lib/node/disseminator.mli b/lib/pnode/disseminator.mli similarity index 100% rename from lib/node/disseminator.mli rename to lib/pnode/disseminator.mli diff --git a/lib/node/dune b/lib/pnode/dune similarity index 81% rename from lib/node/dune rename to lib/pnode/dune index b911382..564f9a3 100644 --- a/lib/node/dune +++ b/lib/pnode/dune @@ -1,6 +1,6 @@ (library (name node) - (public_name pollinate.node) + (public_name pollinate.pnode) (libraries bin_prot lwt lwt.unix pollinate.common) (preprocess (pps ppx_bin_prot ppx_deriving.show lwt_ppx))) diff --git a/lib/node/failure_detector.ml b/lib/pnode/failure_detector.ml similarity index 100% rename from lib/node/failure_detector.ml rename to lib/pnode/failure_detector.ml diff --git a/lib/node/failure_detector.mli b/lib/pnode/failure_detector.mli similarity index 100% rename from lib/node/failure_detector.mli rename to lib/pnode/failure_detector.mli diff --git a/lib/node/message.ml b/lib/pnode/message.ml similarity index 100% rename from lib/node/message.ml rename to lib/pnode/message.ml diff --git a/lib/node/message.mli b/lib/pnode/message.mli similarity index 100% rename from lib/node/message.mli rename to lib/pnode/message.mli diff --git a/lib/node/networking.ml b/lib/pnode/networking.ml similarity index 100% rename from lib/node/networking.ml rename to lib/pnode/networking.ml diff --git a/lib/node/networking.mli b/lib/pnode/networking.mli similarity index 100% rename from lib/node/networking.mli rename to lib/pnode/networking.mli diff --git a/lib/node/node.ml b/lib/pnode/node.ml similarity index 100% rename from lib/node/node.ml rename to lib/pnode/node.ml diff --git a/lib/node/node.mli b/lib/pnode/node.mli similarity index 100% rename from lib/node/node.mli rename to lib/pnode/node.mli diff --git a/lib/node/server.ml b/lib/pnode/server.ml similarity index 100% rename from lib/node/server.ml rename to lib/pnode/server.ml diff --git a/lib/node/server.mli b/lib/pnode/server.mli similarity index 100% rename from lib/node/server.mli rename to lib/pnode/server.mli diff --git a/lib/node/types.ml b/lib/pnode/types.ml similarity index 100% rename from lib/node/types.ml rename to lib/pnode/types.ml diff --git a/lib/node/types.mli b/lib/pnode/types.mli similarity index 100% rename from lib/node/types.mli rename to lib/pnode/types.mli diff --git a/lib/node/util.ml b/lib/pnode/util.ml similarity index 100% rename from lib/node/util.ml rename to lib/pnode/util.ml diff --git a/lib/pollinate.ml b/lib/pollinate.ml index 84d1900..2d02d90 100644 --- a/lib/pollinate.ml +++ b/lib/pollinate.ml @@ -1,4 +1,4 @@ module Address = Common.Address module Util = Common.Util module Peer = Common.Peer -module Node = Node +module PNode = Node diff --git a/test/commons.ml b/test/commons.ml index 9b8106d..4c9d7fa 100644 --- a/test/commons.ml +++ b/test/commons.ml @@ -1,6 +1,6 @@ (** Utils function shared by the different tests modules *) module Commons = struct - open Pollinate.Node + open Pollinate.PNode open Pollinate.Util open Messages diff --git a/test/disseminator_tests.ml b/test/disseminator_tests.ml index d2a3b04..515c7c3 100644 --- a/test/disseminator_tests.ml +++ b/test/disseminator_tests.ml @@ -1,7 +1,7 @@ open Lwt.Infix open Commons open Pollinate -open Pollinate.Node +open Pollinate.PNode module Disseminator_tests = struct let node = diff --git a/test/failure_detector_prop.ml b/test/failure_detector_prop.ml index 38f3c42..ff45a9f 100644 --- a/test/failure_detector_prop.ml +++ b/test/failure_detector_prop.ml @@ -1,7 +1,7 @@ open QCheck2.Gen open Pollinate.Peer open Pollinate -module SUT = Pollinate.Node.Testing.Failure_detector +module SUT = Pollinate.PNode.Testing.Failure_detector let node_a = Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3002 }) @@ -11,7 +11,7 @@ let update_peer = ~name:"update_neighbor_status successfully update neighbor status" (pair Generators.peer_gen Generators.peer_status_gen) (fun (neighbor, neighbor_status) -> - let _ = add_neighbor (Pollinate.Node.Client.peer_from !node_a) neighbor in + let _ = add_neighbor (Pollinate.PNode.Client.peer_from !node_a) neighbor in let _ = SUT.update_peer_status node_a neighbor neighbor_status in neighbor.status = neighbor_status) diff --git a/test/failure_detector_tests.ml b/test/failure_detector_tests.ml index 9a46f83..53d91f6 100644 --- a/test/failure_detector_tests.ml +++ b/test/failure_detector_tests.ml @@ -1,7 +1,7 @@ open Pollinate -open Pollinate.Node +open Pollinate.PNode open Lwt.Infix -module SUT = Pollinate.Node.Testing.Failure_detector +module SUT = Pollinate.PNode.Testing.Failure_detector let node_a = Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3003 }) @@ -15,7 +15,7 @@ let failure_detection () = let open Common.Peer in let open Client in let _ = add_peer_as_is !node_a peer_b in - let _ = Pollinate.Node.Client.peer_from !node_a in + let _ = Pollinate.PNode.Client.peer_from !node_a in let _ = SUT.update_peer_status node_a peer_b Suspicious in (* Need to wait for the timeout to be reached An other way to do, would be to change the `last_suspicious_status` of the peer *) let%lwt _ = Lwt_unix.sleep 9.1 in @@ -24,7 +24,7 @@ let failure_detection () = let failure_detection_nothing_on_alive () = let open Common.Peer in - let _ = add_neighbor (Pollinate.Node.Client.peer_from !node_a) peer_b in + let _ = add_neighbor (Pollinate.PNode.Client.peer_from !node_a) peer_b in let _ = SUT.update_peer_status node_a peer_b Alive in let%lwt _ = SUT.failure_detection node_a in Lwt.return (Base.Hashtbl.length !node_a.peers = 1) diff --git a/test/gossip_tests.ml b/test/gossip_tests.ml index 70e8454..486e91f 100644 --- a/test/gossip_tests.ml +++ b/test/gossip_tests.ml @@ -1,7 +1,7 @@ open Lwt.Infix open Commons open Pollinate -open Pollinate.Node +open Pollinate.PNode module Gossip_tests = struct let local_address port = Address.{ address = "127.0.0.1"; port } diff --git a/test/networking_prop.ml b/test/networking_prop.ml index 2bc018b..660f8fd 100644 --- a/test/networking_prop.ml +++ b/test/networking_prop.ml @@ -1,7 +1,7 @@ open QCheck2.Gen open Pollinate.Peer open Pollinate -module SUT = Pollinate.Node.Testing.Networking +module SUT = Pollinate.PNode.Testing.Networking let node_a = Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 2002 }) diff --git a/test/node_tests.ml b/test/node_tests.ml index 140ff2f..b7a6972 100644 --- a/test/node_tests.ml +++ b/test/node_tests.ml @@ -1,7 +1,7 @@ open Lwt.Infix open Commons open Pollinate -open Pollinate.Node +open Pollinate.PNode open Pollinate.Util open Messages From 7068755040f2c7f8a2e3f209473f7a7daf8860d7 Mon Sep 17 00:00:00 2001 From: Gauthier SEBILLE Date: Wed, 6 Jul 2022 10:19:54 +0200 Subject: [PATCH 05/17] following review --- lib/pnode/client.ml | 22 +++++++++++----------- lib/pnode/client.mli | 15 +++++++++++---- lib/pnode/failure_detector.ml | 2 +- lib/pnode/message.ml | 2 +- lib/pnode/message.mli | 2 +- lib/pnode/node.mli | 2 +- lib/pnode/server.ml | 16 ++++++---------- lib/pnode/server.mli | 2 +- test/commons.ml | 5 ++--- test/disseminator_tests.ml | 6 +++--- test/gossip_tests.ml | 4 +++- test/node_tests.ml | 6 +++--- 12 files changed, 44 insertions(+), 40 deletions(-) diff --git a/lib/pnode/client.ml b/lib/pnode/client.ml index 74b398c..c462f50 100644 --- a/lib/pnode/client.ml +++ b/lib/pnode/client.ml @@ -21,8 +21,8 @@ let add_peer_as_is node (peer : Peer.t) = let peers node = Base.Hashtbl.keys node.peers -let create_request node ?(request_ack = false) recipient - (payload, payload_signature) = +let create_request node ?(request_ack = false) recipient ?payload_signature + payload = Mutex.with_lock !node.current_request_id (fun id -> id := !id + 1; Lwt.return @@ -36,11 +36,11 @@ let create_request node ?(request_ack = false) recipient sender = !node.address; recipients = [recipient]; payload; - payload_signature; + payload_signature_opt = payload_signature; }) -let create_response node ?(request_ack = false) request - (payload, payload_signature) = +let create_response node ?(request_ack = false) request ?payload_signature + payload = Message. { category = Message.Response; @@ -51,10 +51,10 @@ let create_response node ?(request_ack = false) request sender = !node.address; recipients = [request.sender]; payload; - payload_signature; + payload_signature_opt = payload_signature; } -let create_post node ?(request_ack = false) (payload, payload_signature) = +let create_post node ?(request_ack = false) ?payload_signature payload = Message. { category = Message.Post; @@ -65,7 +65,7 @@ let create_post node ?(request_ack = false) (payload, payload_signature) = sender = !node.address; recipients = []; payload; - payload_signature; + payload_signature_opt = payload_signature; } let create_ack node incoming_message = @@ -79,11 +79,11 @@ let create_ack node incoming_message = sender = !node.address; recipients = [incoming_message.sender]; payload = incoming_message |> Message.hash_of |> Bytes.of_string; - payload_signature = None; + payload_signature_opt = None; } -let request node request recipient = - let%lwt message = create_request node recipient request in +let request node recipient ?payload_signature payload = + let%lwt message = create_request node recipient ?payload_signature payload in let%lwt () = Networking.send_to node message in let condition_var = Lwt_condition.create () in Hashtbl.add !node.request_table message.id condition_var; diff --git a/lib/pnode/client.mli b/lib/pnode/client.mli index 425da90..076d144 100644 --- a/lib/pnode/client.mli +++ b/lib/pnode/client.mli @@ -30,7 +30,8 @@ val create_request : node ref -> ?request_ack:bool -> Address.t -> - bytes * bytes option -> + ?payload_signature:bytes -> + bytes -> Message.t Lwt.t (** [create_response node request payload] creates a [Message.t] of the {i Response category} @@ -39,19 +40,25 @@ val create_response : node ref -> ?request_ack:bool -> Message.t -> - bytes * bytes option -> + ?payload_signature:bytes -> + bytes -> Message.t (** Sends an encoded {i request} to the specified peer and returns a promise holding the response from the peer. This function blocks the current thread of execution until a response arrives. *) -val request : node ref -> bytes * bytes option -> Address.t -> Message.t Lwt.t +val request : + node ref -> Address.t -> ?payload_signature:bytes -> bytes -> Message.t Lwt.t (** [create_post node payload] creates a [Message.t] of the {i Post category} containing {i payload} for eventual gossip dissemination across the entire network. *) val create_post : - node ref -> ?request_ack:bool -> bytes * bytes option -> Message.t + node ref -> + ?request_ack:bool -> + ?payload_signature:bytes -> + bytes -> + Message.t val create_ack : node ref -> Message.t -> Message.t diff --git a/lib/pnode/failure_detector.ml b/lib/pnode/failure_detector.ml index 8033a62..4a79024 100644 --- a/lib/pnode/failure_detector.ml +++ b/lib/pnode/failure_detector.ml @@ -65,7 +65,7 @@ let create_message node message recipient = sender = Client.address_of !node; recipients = [recipient.Peer.address]; payload = Encoding.pack bin_writer_message message; - payload_signature = None; + payload_signature_opt = None; } let send_message message node recipient = diff --git a/lib/pnode/message.ml b/lib/pnode/message.ml index b19d059..caffef6 100644 --- a/lib/pnode/message.ml +++ b/lib/pnode/message.ml @@ -20,7 +20,7 @@ type t = { sender : Address.t; recipients : Address.t list; payload : bytes; - payload_signature : bytes option; + payload_signature_opt : bytes option; } [@@deriving bin_io] diff --git a/lib/pnode/message.mli b/lib/pnode/message.mli index 4e0ffc7..d5f4ea3 100644 --- a/lib/pnode/message.mli +++ b/lib/pnode/message.mli @@ -31,7 +31,7 @@ type t = { sender : Address.t; recipients : Address.t list; payload : bytes; - payload_signature : bytes option; + payload_signature_opt : bytes option; } [@@deriving bin_io] diff --git a/lib/pnode/node.mli b/lib/pnode/node.mli index 5d345e0..0d6ac7d 100644 --- a/lib/pnode/node.mli +++ b/lib/pnode/node.mli @@ -14,7 +14,7 @@ val init : ?init_peers:Address.t list -> Address.t -> t ref Lwt.t val run_server : ?preprocessor:(Message.t -> Message.t) -> - msg_handler:(Message.t -> bytes option * bytes option) -> + msg_handler:(Message.t -> bytes * bytes option) -> t ref -> 'b Lwt.t diff --git a/lib/pnode/server.ml b/lib/pnode/server.ml index d283754..c28be91 100644 --- a/lib/pnode/server.ml +++ b/lib/pnode/server.ml @@ -34,7 +34,7 @@ let handle_ack node msg = Otherwise, we just apply the message handler and that's it. *) let process_message node preprocessor - (msg_handler : Message.t -> bytes option * bytes option) = + (msg_handler : Message.t -> bytes * bytes option) = let open Message in let%lwt message = Networking.recv_next node in let message = preprocessor message in @@ -53,14 +53,10 @@ let process_message node preprocessor !node.address.address !node.address.port message.sender.address message.sender.port) in *) match msg_handler message with - | Some payload, payload_signature -> + | payload, payload_signature -> (* let _ = Printf.sprintf "I am inside msg_handler, found payload\n%!" in *) - (payload, payload_signature) - |> Client.create_response node message - |> Networking.send_to node - | None, _ -> - (* let _ = Printf.sprintf "I am inside msg_handler, missing payload\n%!" in *) - Lwt.return ()) + Client.create_response node message ?payload_signature payload + |> Networking.send_to node) | Acknowledgment -> let msg_hash = Bytes.to_string message.payload in let new_addrs = @@ -132,8 +128,8 @@ let _print_logs node = 3. Run the disseminator, this includes actually sending messages to be disseminated across the network. 4. Wait 0.001 seconds before restarting the procedure. *) -let rec run node preprocessor - (msg_handler : Message.t -> bytes option * bytes option) = +let rec run node preprocessor (msg_handler : Message.t -> bytes * bytes option) + = (* Step 0 *) (* let%lwt () = print_logs node in *) (* Step 1 *) diff --git a/lib/pnode/server.mli b/lib/pnode/server.mli index 17019e0..2429b86 100644 --- a/lib/pnode/server.mli +++ b/lib/pnode/server.mli @@ -4,5 +4,5 @@ val run : Types.node ref -> (Message.t -> Message.t) -> - (Message.t -> bytes option * bytes option) -> + (Message.t -> bytes * bytes option) -> 'b Lwt.t diff --git a/test/commons.ml b/test/commons.ml index 4c9d7fa..2616a29 100644 --- a/test/commons.ml +++ b/test/commons.ml @@ -28,7 +28,6 @@ module Commons = struct | Ping -> Pong | Get -> Pong | Insert _ -> Success "Successfully added value to state" in - ( Response response |> Encoding.pack bin_writer_message |> Option.some, - None ) - | _ -> (None, None) + (Response response |> Encoding.pack bin_writer_message, None) + | _ -> failwith "unhandled case" end diff --git a/test/disseminator_tests.ml b/test/disseminator_tests.ml index 515c7c3..e61fa1b 100644 --- a/test/disseminator_tests.ml +++ b/test/disseminator_tests.ml @@ -18,7 +18,7 @@ module Disseminator_tests = struct |> (fun Address.{ port; _ } -> port) |> string_of_int |> String.to_bytes in - Client.create_post node (payload, None) |> Client.post node; + Client.create_post node payload ?payload_signature:None |> Client.post node; Lwt.return (List.length (Node.Testing.broadcast_queue node)) @@ -29,7 +29,7 @@ module Disseminator_tests = struct |> (fun Address.{ port; _ } -> port) |> string_of_int |> String.to_bytes in - Client.create_post node (payload, None) |> Client.post node; + Client.create_post node payload ?payload_signature:None |> Client.post node; let%lwt () = while%lwt Node.Testing.disseminator_round node <= 10 do @@ -44,7 +44,7 @@ module Disseminator_tests = struct |> (fun Address.{ port; _ } -> port) |> string_of_int |> String.to_bytes in - let message = Client.create_post node (payload, None) in + let message = Client.create_post node payload ?payload_signature:None in message |> Client.post node; Lwt.return (Node.seen node message) diff --git a/test/gossip_tests.ml b/test/gossip_tests.ml index 486e91f..8d62bc6 100644 --- a/test/gossip_tests.ml +++ b/test/gossip_tests.ml @@ -106,7 +106,9 @@ module Gossip_tests = struct |> (fun Address.{ port; _ } -> port) |> string_of_int |> String.to_bytes in - let message = Client.create_post node (payload, None) ~request_ack:true in + let message = + Client.create_post node payload ?payload_signature:None ~request_ack:true + in Client.post node message; (* Post the created message *) diff --git a/test/node_tests.ml b/test/node_tests.ml index b7a6972..2bc109f 100644 --- a/test/node_tests.ml +++ b/test/node_tests.ml @@ -29,11 +29,11 @@ module Node_tests = struct let get = Encoding.pack bin_writer_message (Request Get) in let%lwt { payload = res_from_b; _ } = - Client.request node_a (get, None) peer_b.address in + Client.request node_a peer_b.address get ?payload_signature:None in let res_from_b = Encoding.unpack bin_read_response res_from_b in let%lwt { payload = res_from_a; _ } = - Client.request node_b (get, None) peer_a.address in + Client.request node_b peer_a.address get ?payload_signature:None in let res_from_a = Encoding.unpack bin_read_response res_from_a in let res_from_b, res_from_a = @@ -53,7 +53,7 @@ module Node_tests = struct let ping = Encoding.pack bin_writer_message (Request Ping) in let%lwt { payload = pong; _ } = - Client.request node_a (ping, None) peer_b.address in + Client.request node_a peer_b.address ping ?payload_signature:None in let pong = Encoding.unpack bin_read_response pong in let pong = From 08a641821e0ecfa2e02d96948eaab072b16abf81 Mon Sep 17 00:00:00 2001 From: Gauthier SEBILLE Date: Wed, 6 Jul 2022 10:46:44 +0200 Subject: [PATCH 06/17] fix tests --- test/commons.ml | 3 ++- test/gossip_tests.ml | 4 +--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/test/commons.ml b/test/commons.ml index 2616a29..3fbb4e4 100644 --- a/test/commons.ml +++ b/test/commons.ml @@ -29,5 +29,6 @@ module Commons = struct | Get -> Pong | Insert _ -> Success "Successfully added value to state" in (Response response |> Encoding.pack bin_writer_message, None) - | _ -> failwith "unhandled case" + | Post -> (message.payload, None) (* ugly fix for tests. In case of a *) + | _ -> failwith "unhandled in tests" end diff --git a/test/gossip_tests.ml b/test/gossip_tests.ml index 8d62bc6..aa0c2b1 100644 --- a/test/gossip_tests.ml +++ b/test/gossip_tests.ml @@ -107,10 +107,8 @@ module Gossip_tests = struct |> string_of_int |> String.to_bytes in let message = - Client.create_post node payload ?payload_signature:None ~request_ack:true + Client.create_post node ~request_ack:true ?payload_signature:None payload in - Client.post node message; - (* Post the created message *) Client.post node message; From 5c2f49e8f60f86ce6d872ab626657e6cdfde76d1 Mon Sep 17 00:00:00 2001 From: Gauthier SEBILLE Date: Wed, 6 Jul 2022 23:01:38 +0200 Subject: [PATCH 07/17] review + rework request --- lib/pnode/client.ml | 26 +++++++++++++------------- lib/pnode/client.mli | 6 ++++-- lib/pnode/failure_detector.ml | 4 ++-- lib/pnode/message.ml | 4 ++-- lib/pnode/message.mli | 4 ++-- test/disseminator_tests.ml | 6 +++--- test/gossip_tests.ml | 4 +--- test/node_tests.ml | 12 ++++++------ 8 files changed, 33 insertions(+), 33 deletions(-) diff --git a/lib/pnode/client.ml b/lib/pnode/client.ml index c462f50..e7a2490 100644 --- a/lib/pnode/client.ml +++ b/lib/pnode/client.ml @@ -22,68 +22,68 @@ let add_peer_as_is node (peer : Peer.t) = let peers node = Base.Hashtbl.keys node.peers let create_request node ?(request_ack = false) recipient ?payload_signature - payload = + ?sub_category payload = Mutex.with_lock !node.current_request_id (fun id -> id := !id + 1; Lwt.return Message. { category = Message.Request; - sub_category_opt = None; + sub_category; request_ack; id = !id; timestamp = Unix.gettimeofday (); sender = !node.address; recipients = [recipient]; payload; - payload_signature_opt = payload_signature; + payload_signature; }) let create_response node ?(request_ack = false) request ?payload_signature - payload = + ?sub_category payload = Message. { category = Message.Response; - sub_category_opt = None; + sub_category; request_ack; id = request.id; timestamp = Unix.gettimeofday (); sender = !node.address; recipients = [request.sender]; payload; - payload_signature_opt = payload_signature; + payload_signature; } -let create_post node ?(request_ack = false) ?payload_signature payload = +let create_post node ?(request_ack = false) ?payload_signature ?sub_category + payload = Message. { category = Message.Post; request_ack; id = -1; - sub_category_opt = None; + sub_category; timestamp = Unix.gettimeofday (); sender = !node.address; recipients = []; payload; - payload_signature_opt = payload_signature; + payload_signature; } let create_ack node incoming_message = Message. { category = Message.Acknowledgment; - sub_category_opt = None; + sub_category = None; request_ack = false; id = -1; timestamp = Unix.gettimeofday (); sender = !node.address; recipients = [incoming_message.sender]; payload = incoming_message |> Message.hash_of |> Bytes.of_string; - payload_signature_opt = None; + payload_signature = None; } -let request node recipient ?payload_signature payload = - let%lwt message = create_request node recipient ?payload_signature payload in +let request node message = let%lwt () = Networking.send_to node message in let condition_var = Lwt_condition.create () in Hashtbl.add !node.request_table message.id condition_var; diff --git a/lib/pnode/client.mli b/lib/pnode/client.mli index 076d144..6d26624 100644 --- a/lib/pnode/client.mli +++ b/lib/pnode/client.mli @@ -31,6 +31,7 @@ val create_request : ?request_ack:bool -> Address.t -> ?payload_signature:bytes -> + ?sub_category:string * string -> bytes -> Message.t Lwt.t @@ -41,6 +42,7 @@ val create_response : ?request_ack:bool -> Message.t -> ?payload_signature:bytes -> + ?sub_category:string * string -> bytes -> Message.t @@ -48,8 +50,7 @@ val create_response : returns a promise holding the response from the peer. This function blocks the current thread of execution until a response arrives. *) -val request : - node ref -> Address.t -> ?payload_signature:bytes -> bytes -> Message.t Lwt.t +val request : node ref -> Message.t -> Message.t Lwt.t (** [create_post node payload] creates a [Message.t] of the {i Post category} containing {i payload} for eventual gossip dissemination across the @@ -58,6 +59,7 @@ val create_post : node ref -> ?request_ack:bool -> ?payload_signature:bytes -> + ?sub_category:string * string -> bytes -> Message.t diff --git a/lib/pnode/failure_detector.ml b/lib/pnode/failure_detector.ml index 4a79024..2e680d7 100644 --- a/lib/pnode/failure_detector.ml +++ b/lib/pnode/failure_detector.ml @@ -59,13 +59,13 @@ let create_message node message recipient = { category = Failure_detection; request_ack = false; - sub_category_opt = None; + sub_category = None; id = -1; timestamp = Unix.gettimeofday (); sender = Client.address_of !node; recipients = [recipient.Peer.address]; payload = Encoding.pack bin_writer_message message; - payload_signature_opt = None; + payload_signature = None; } let send_message message node recipient = diff --git a/lib/pnode/message.ml b/lib/pnode/message.ml index caffef6..720e56b 100644 --- a/lib/pnode/message.ml +++ b/lib/pnode/message.ml @@ -13,14 +13,14 @@ type category = type t = { category : category; - sub_category_opt : (string * string) option; + sub_category : (string * string) option; request_ack : bool; id : int; timestamp : float; sender : Address.t; recipients : Address.t list; payload : bytes; - payload_signature_opt : bytes option; + payload_signature : bytes option; } [@@deriving bin_io] diff --git a/lib/pnode/message.mli b/lib/pnode/message.mli index d5f4ea3..577e7b9 100644 --- a/lib/pnode/message.mli +++ b/lib/pnode/message.mli @@ -24,14 +24,14 @@ type category = processed by the node's message handler. *) type t = { category : category; - sub_category_opt : (string * string) option; + sub_category : (string * string) option; request_ack : bool; id : int; timestamp : float; sender : Address.t; recipients : Address.t list; payload : bytes; - payload_signature_opt : bytes option; + payload_signature : bytes option; } [@@deriving bin_io] diff --git a/test/disseminator_tests.ml b/test/disseminator_tests.ml index e61fa1b..57e1f31 100644 --- a/test/disseminator_tests.ml +++ b/test/disseminator_tests.ml @@ -18,7 +18,7 @@ module Disseminator_tests = struct |> (fun Address.{ port; _ } -> port) |> string_of_int |> String.to_bytes in - Client.create_post node payload ?payload_signature:None |> Client.post node; + Client.create_post node payload |> Client.post node; Lwt.return (List.length (Node.Testing.broadcast_queue node)) @@ -29,7 +29,7 @@ module Disseminator_tests = struct |> (fun Address.{ port; _ } -> port) |> string_of_int |> String.to_bytes in - Client.create_post node payload ?payload_signature:None |> Client.post node; + Client.create_post node payload |> Client.post node; let%lwt () = while%lwt Node.Testing.disseminator_round node <= 10 do @@ -44,7 +44,7 @@ module Disseminator_tests = struct |> (fun Address.{ port; _ } -> port) |> string_of_int |> String.to_bytes in - let message = Client.create_post node payload ?payload_signature:None in + let message = Client.create_post node payload in message |> Client.post node; Lwt.return (Node.seen node message) diff --git a/test/gossip_tests.ml b/test/gossip_tests.ml index aa0c2b1..0074ce3 100644 --- a/test/gossip_tests.ml +++ b/test/gossip_tests.ml @@ -106,9 +106,7 @@ module Gossip_tests = struct |> (fun Address.{ port; _ } -> port) |> string_of_int |> String.to_bytes in - let message = - Client.create_post node ~request_ack:true ?payload_signature:None payload - in + let message = Client.create_post node ~request_ack:true payload in (* Post the created message *) Client.post node message; diff --git a/test/node_tests.ml b/test/node_tests.ml index 2bc109f..5b84a6a 100644 --- a/test/node_tests.ml +++ b/test/node_tests.ml @@ -28,12 +28,12 @@ module Node_tests = struct [node_a; node_b] in let get = Encoding.pack bin_writer_message (Request Get) in - let%lwt { payload = res_from_b; _ } = - Client.request node_a peer_b.address get ?payload_signature:None in + let%lwt message = Client.create_request node_a peer_b.address get in + let%lwt { payload = res_from_b; _ } = Client.request node_a message in let res_from_b = Encoding.unpack bin_read_response res_from_b in - let%lwt { payload = res_from_a; _ } = - Client.request node_b peer_a.address get ?payload_signature:None in + let%lwt message = Client.create_request node_b peer_a.address get in + let%lwt { payload = res_from_a; _ } = Client.request node_b message in let res_from_a = Encoding.unpack bin_read_response res_from_a in let res_from_b, res_from_a = @@ -52,8 +52,8 @@ module Node_tests = struct [node_a; node_b] in let ping = Encoding.pack bin_writer_message (Request Ping) in - let%lwt { payload = pong; _ } = - Client.request node_a peer_b.address ping ?payload_signature:None in + let%lwt message = Client.create_request node_a peer_b.address ping in + let%lwt { payload = pong; _ } = Client.request node_a message in let pong = Encoding.unpack bin_read_response pong in let pong = From 0c34e8d52ba5f76db9d91986020cc178eebc4554 Mon Sep 17 00:00:00 2001 From: Gauthier SEBILLE Date: Thu, 7 Jul 2022 11:59:03 +0200 Subject: [PATCH 08/17] expose broadcast function + rework msg_handler --- lib/pnode/client.ml | 3 +++ lib/pnode/client.mli | 2 ++ lib/pnode/message.ml | 6 ++++++ lib/pnode/message.mli | 6 ++++++ lib/pnode/networking.mli | 3 +++ lib/pnode/node.mli | 2 +- lib/pnode/server.ml | 14 ++++++++------ lib/pnode/server.mli | 2 +- test/commons.ml | 12 +++++++++--- 9 files changed, 39 insertions(+), 11 deletions(-) diff --git a/lib/pnode/client.ml b/lib/pnode/client.ml index e7a2490..da69253 100644 --- a/lib/pnode/client.ml +++ b/lib/pnode/client.ml @@ -91,3 +91,6 @@ let request node message = let post node message = !node.disseminator <- Disseminator.post !node.disseminator message + +let broadcast node message recipients = + Networking.broadcast node message recipients diff --git a/lib/pnode/client.mli b/lib/pnode/client.mli index 6d26624..ef1f20a 100644 --- a/lib/pnode/client.mli +++ b/lib/pnode/client.mli @@ -64,3 +64,5 @@ val create_post : Message.t val create_ack : node ref -> Message.t -> Message.t + +val broadcast : node ref -> Message.t -> Address.t list -> unit Lwt.t diff --git a/lib/pnode/message.ml b/lib/pnode/message.ml index 720e56b..bc1b382 100644 --- a/lib/pnode/message.ml +++ b/lib/pnode/message.ml @@ -24,6 +24,12 @@ type t = { } [@@deriving bin_io] +type msg = { + payload : bytes; + payload_signature : bytes option; +} +[@@deriving bin_io] + let hash_of m = [ m.sender.address; diff --git a/lib/pnode/message.mli b/lib/pnode/message.mli index 577e7b9..f99a55c 100644 --- a/lib/pnode/message.mli +++ b/lib/pnode/message.mli @@ -35,4 +35,10 @@ type t = { } [@@deriving bin_io] +type msg = { + payload : bytes; + payload_signature : bytes option; +} +[@@deriving bin_io] + val hash_of : t -> Digest.t diff --git a/lib/pnode/networking.mli b/lib/pnode/networking.mli index e818c96..58bc477 100644 --- a/lib/pnode/networking.mli +++ b/lib/pnode/networking.mli @@ -6,6 +6,9 @@ to a specified peer within the [Message.t]. Construct a message with one of the [create_*] functions to then feed to this function. *) val send_to : node ref -> Message.t -> unit Lwt.t +(** Same as `send_to` but to several recipients *) +val broadcast : node ref -> Message.t -> Address.t list -> unit Lwt.t + (** Waits for the next incoming message and returns it. *) val recv_next : node ref -> Message.t Lwt.t diff --git a/lib/pnode/node.mli b/lib/pnode/node.mli index 0d6ac7d..fc9dff5 100644 --- a/lib/pnode/node.mli +++ b/lib/pnode/node.mli @@ -14,7 +14,7 @@ val init : ?init_peers:Address.t list -> Address.t -> t ref Lwt.t val run_server : ?preprocessor:(Message.t -> Message.t) -> - msg_handler:(Message.t -> bytes * bytes option) -> + msg_handler:(Message.t -> Message.msg option) -> t ref -> 'b Lwt.t diff --git a/lib/pnode/server.ml b/lib/pnode/server.ml index c28be91..9c52f73 100644 --- a/lib/pnode/server.ml +++ b/lib/pnode/server.ml @@ -34,7 +34,7 @@ let handle_ack node msg = Otherwise, we just apply the message handler and that's it. *) let process_message node preprocessor - (msg_handler : Message.t -> bytes * bytes option) = + (msg_handler : Message.t -> Message.msg option) = let open Message in let%lwt message = Networking.recv_next node in let message = preprocessor message in @@ -53,10 +53,13 @@ let process_message node preprocessor !node.address.address !node.address.port message.sender.address message.sender.port) in *) match msg_handler message with - | payload, payload_signature -> + | Some msg -> (* let _ = Printf.sprintf "I am inside msg_handler, found payload\n%!" in *) - Client.create_response node message ?payload_signature payload - |> Networking.send_to node) + Client.create_response node message + ?payload_signature:msg.payload_signature msg.payload + |> Networking.send_to node + | None -> + failwith "received request without payload nor payload_signature") | Acknowledgment -> let msg_hash = Bytes.to_string message.payload in let new_addrs = @@ -128,8 +131,7 @@ let _print_logs node = 3. Run the disseminator, this includes actually sending messages to be disseminated across the network. 4. Wait 0.001 seconds before restarting the procedure. *) -let rec run node preprocessor (msg_handler : Message.t -> bytes * bytes option) - = +let rec run node preprocessor (msg_handler : Message.t -> Message.msg option) = (* Step 0 *) (* let%lwt () = print_logs node in *) (* Step 1 *) diff --git a/lib/pnode/server.mli b/lib/pnode/server.mli index 2429b86..c7f25f8 100644 --- a/lib/pnode/server.mli +++ b/lib/pnode/server.mli @@ -4,5 +4,5 @@ val run : Types.node ref -> (Message.t -> Message.t) -> - (Message.t -> bytes * bytes option) -> + (Message.t -> Message.msg option) -> 'b Lwt.t diff --git a/test/commons.ml b/test/commons.ml index 3fbb4e4..51af2c7 100644 --- a/test/commons.ml +++ b/test/commons.ml @@ -3,6 +3,7 @@ module Commons = struct open Pollinate.PNode open Pollinate.Util open Messages + open Message let preprocessor msg = let open Messages in @@ -19,7 +20,6 @@ module Commons = struct let msg_handler message = let open Messages in - let open Message in match message.category with | Request -> let request = Encoding.unpack bin_read_request message.payload in @@ -28,7 +28,13 @@ module Commons = struct | Ping -> Pong | Get -> Pong | Insert _ -> Success "Successfully added value to state" in - (Response response |> Encoding.pack bin_writer_message, None) - | Post -> (message.payload, None) (* ugly fix for tests. In case of a *) + let msg = + Message. + { + payload = Response response |> Encoding.pack bin_writer_message; + payload_signature = None; + } in + Some msg + | Post -> None | _ -> failwith "unhandled in tests" end From 68ed353a8be12074ec13b65b68e53820f319496a Mon Sep 17 00:00:00 2001 From: Gauthier SEBILLE Date: Thu, 7 Jul 2022 12:18:14 +0200 Subject: [PATCH 09/17] rework Message.msg --- lib/pnode/client.ml | 16 ++++++++-------- lib/pnode/failure_detector.ml | 6 +++--- lib/pnode/message.ml | 17 ++++++++--------- lib/pnode/message.mli | 15 +++++++-------- lib/pnode/node.mli | 2 +- lib/pnode/server.ml | 11 ++++++----- lib/pnode/server.mli | 2 +- test/commons.ml | 22 +++++++++++++++------- test/node_tests.ml | 6 +++--- 9 files changed, 52 insertions(+), 45 deletions(-) diff --git a/lib/pnode/client.ml b/lib/pnode/client.ml index da69253..fdd87c3 100644 --- a/lib/pnode/client.ml +++ b/lib/pnode/client.ml @@ -35,8 +35,7 @@ let create_request node ?(request_ack = false) recipient ?payload_signature timestamp = Unix.gettimeofday (); sender = !node.address; recipients = [recipient]; - payload; - payload_signature; + payload = { data = payload; signature = payload_signature }; }) let create_response node ?(request_ack = false) request ?payload_signature @@ -50,8 +49,7 @@ let create_response node ?(request_ack = false) request ?payload_signature timestamp = Unix.gettimeofday (); sender = !node.address; recipients = [request.sender]; - payload; - payload_signature; + payload = { data = payload; signature = payload_signature }; } let create_post node ?(request_ack = false) ?payload_signature ?sub_category @@ -65,8 +63,7 @@ let create_post node ?(request_ack = false) ?payload_signature ?sub_category timestamp = Unix.gettimeofday (); sender = !node.address; recipients = []; - payload; - payload_signature; + payload = { data = payload; signature = payload_signature }; } let create_ack node incoming_message = @@ -79,8 +76,11 @@ let create_ack node incoming_message = timestamp = Unix.gettimeofday (); sender = !node.address; recipients = [incoming_message.sender]; - payload = incoming_message |> Message.hash_of |> Bytes.of_string; - payload_signature = None; + payload = + { + data = incoming_message |> Message.hash_of |> Bytes.of_string; + signature = None; + }; } let request node message = diff --git a/lib/pnode/failure_detector.ml b/lib/pnode/failure_detector.ml index 2e680d7..29cfcea 100644 --- a/lib/pnode/failure_detector.ml +++ b/lib/pnode/failure_detector.ml @@ -64,8 +64,8 @@ let create_message node message recipient = timestamp = Unix.gettimeofday (); sender = Client.address_of !node; recipients = [recipient.Peer.address]; - payload = Encoding.pack bin_writer_message message; - payload_signature = None; + payload = + { data = Encoding.pack bin_writer_message message; signature = None }; } let send_message message node recipient = @@ -82,7 +82,7 @@ let send_ping_request_to node (recipient : Peer.t) = let handle_message node message = let open Message in let sender = Peer.from message.sender in - let msg = Encoding.unpack bin_read_message message.payload in + let msg = Encoding.unpack bin_read_message message.payload.data in let t = !node.failure_detector in match msg with | Ping -> send_acknowledge_to node sender diff --git a/lib/pnode/message.ml b/lib/pnode/message.ml index bc1b382..89b3c0d 100644 --- a/lib/pnode/message.ml +++ b/lib/pnode/message.ml @@ -11,6 +11,12 @@ type category = | Custom of string [@@deriving bin_io, show] +type payload = { + data : bytes; + signature : bytes option; +} +[@@deriving bin_io] + type t = { category : category; sub_category : (string * string) option; @@ -19,14 +25,7 @@ type t = { timestamp : float; sender : Address.t; recipients : Address.t list; - payload : bytes; - payload_signature : bytes option; -} -[@@deriving bin_io] - -type msg = { - payload : bytes; - payload_signature : bytes option; + payload : payload; } [@@deriving bin_io] @@ -35,7 +34,7 @@ let hash_of m = m.sender.address; string_of_int m.sender.port; string_of_float m.timestamp; - Bytes.to_string m.payload; + Bytes.to_string m.payload.data; ] |> String.concat "" |> Digest.string diff --git a/lib/pnode/message.mli b/lib/pnode/message.mli index f99a55c..21c963a 100644 --- a/lib/pnode/message.mli +++ b/lib/pnode/message.mli @@ -20,6 +20,12 @@ type category = | Custom of string [@@deriving bin_io, show] +type payload = { + data : bytes; + signature : bytes option; +} +[@@deriving bin_io] + (** Messages received from [peers] which are processed by the node's message handler. *) type t = { @@ -30,14 +36,7 @@ type t = { timestamp : float; sender : Address.t; recipients : Address.t list; - payload : bytes; - payload_signature : bytes option; -} -[@@deriving bin_io] - -type msg = { - payload : bytes; - payload_signature : bytes option; + payload : payload; } [@@deriving bin_io] diff --git a/lib/pnode/node.mli b/lib/pnode/node.mli index fc9dff5..b731ce5 100644 --- a/lib/pnode/node.mli +++ b/lib/pnode/node.mli @@ -14,7 +14,7 @@ val init : ?init_peers:Address.t list -> Address.t -> t ref Lwt.t val run_server : ?preprocessor:(Message.t -> Message.t) -> - msg_handler:(Message.t -> Message.msg option) -> + msg_handler:(Message.t -> Message.payload option) -> t ref -> 'b Lwt.t diff --git a/lib/pnode/server.ml b/lib/pnode/server.ml index 9c52f73..14fa016 100644 --- a/lib/pnode/server.ml +++ b/lib/pnode/server.ml @@ -34,7 +34,7 @@ let handle_ack node msg = Otherwise, we just apply the message handler and that's it. *) let process_message node preprocessor - (msg_handler : Message.t -> Message.msg option) = + (msg_handler : Message.t -> Message.payload option) = let open Message in let%lwt message = Networking.recv_next node in let message = preprocessor message in @@ -55,13 +55,13 @@ let process_message node preprocessor match msg_handler message with | Some msg -> (* let _ = Printf.sprintf "I am inside msg_handler, found payload\n%!" in *) - Client.create_response node message - ?payload_signature:msg.payload_signature msg.payload + Client.create_response node message ?payload_signature:msg.signature + msg.data |> Networking.send_to node | None -> failwith "received request without payload nor payload_signature") | Acknowledgment -> - let msg_hash = Bytes.to_string message.payload in + let msg_hash = Bytes.to_string message.payload.data in let new_addrs = match Hashtbl.find_opt !node.acknowledgments msg_hash with | Some addrs -> AddressSet.add message.sender addrs @@ -131,7 +131,8 @@ let _print_logs node = 3. Run the disseminator, this includes actually sending messages to be disseminated across the network. 4. Wait 0.001 seconds before restarting the procedure. *) -let rec run node preprocessor (msg_handler : Message.t -> Message.msg option) = +let rec run node preprocessor + (msg_handler : Message.t -> Message.payload option) = (* Step 0 *) (* let%lwt () = print_logs node in *) (* Step 1 *) diff --git a/lib/pnode/server.mli b/lib/pnode/server.mli index c7f25f8..9bac75b 100644 --- a/lib/pnode/server.mli +++ b/lib/pnode/server.mli @@ -4,5 +4,5 @@ val run : Types.node ref -> (Message.t -> Message.t) -> - (Message.t -> Message.msg option) -> + (Message.t -> Message.payload option) -> 'b Lwt.t diff --git a/test/commons.ml b/test/commons.ml index 51af2c7..03083bb 100644 --- a/test/commons.ml +++ b/test/commons.ml @@ -10,19 +10,27 @@ module Commons = struct match msg.Message.category with | Request -> let[@warning "-8"] (Request r) = - Encoding.unpack bin_read_message msg.Message.payload in - { msg with payload = Encoding.pack bin_writer_request r } + Encoding.unpack bin_read_message msg.Message.payload.data in + { + msg with + payload = + { data = Encoding.pack bin_writer_request r; signature = None }; + } | Response -> let[@warning "-8"] (Response r) = - Encoding.unpack bin_read_message msg.Message.payload in - { msg with payload = Encoding.pack bin_writer_response r } + Encoding.unpack bin_read_message msg.Message.payload.data in + { + msg with + payload = + { data = Encoding.pack bin_writer_response r; signature = None }; + } | _ -> msg let msg_handler message = let open Messages in match message.category with | Request -> - let request = Encoding.unpack bin_read_request message.payload in + let request = Encoding.unpack bin_read_request message.payload.data in let response = match request with | Ping -> Pong @@ -31,8 +39,8 @@ module Commons = struct let msg = Message. { - payload = Response response |> Encoding.pack bin_writer_message; - payload_signature = None; + data = Response response |> Encoding.pack bin_writer_message; + signature = None; } in Some msg | Post -> None diff --git a/test/node_tests.ml b/test/node_tests.ml index 5b84a6a..0466474 100644 --- a/test/node_tests.ml +++ b/test/node_tests.ml @@ -30,11 +30,11 @@ module Node_tests = struct let%lwt message = Client.create_request node_a peer_b.address get in let%lwt { payload = res_from_b; _ } = Client.request node_a message in - let res_from_b = Encoding.unpack bin_read_response res_from_b in + let res_from_b = Encoding.unpack bin_read_response res_from_b.data in let%lwt message = Client.create_request node_b peer_a.address get in let%lwt { payload = res_from_a; _ } = Client.request node_b message in - let res_from_a = Encoding.unpack bin_read_response res_from_a in + let res_from_a = Encoding.unpack bin_read_response res_from_a.data in let res_from_b, res_from_a = match (res_from_b, res_from_a) with @@ -54,7 +54,7 @@ module Node_tests = struct let%lwt message = Client.create_request node_a peer_b.address ping in let%lwt { payload = pong; _ } = Client.request node_a message in - let pong = Encoding.unpack bin_read_response pong in + let pong = Encoding.unpack bin_read_response pong.data in let pong = match pong with From e08af2ebfe53f45009afc8d6778d70212c5aa8ea Mon Sep 17 00:00:00 2001 From: Gauthier SEBILLE Date: Thu, 7 Jul 2022 12:32:19 +0200 Subject: [PATCH 10/17] fix bad pnode renaming --- lib/pnode/{node.ml => pnode.ml} | 0 lib/pnode/{node.mli => pnode.mli} | 0 test/disseminator_tests.ml | 18 +++++++++--------- test/failure_detector_prop.ml | 5 +++-- test/failure_detector_tests.ml | 6 +++--- test/gossip_tests.ml | 27 +++++++++++++++------------ test/networking_prop.ml | 5 +++-- test/node_tests.ml | 8 ++++---- 8 files changed, 37 insertions(+), 32 deletions(-) rename lib/pnode/{node.ml => pnode.ml} (100%) rename lib/pnode/{node.mli => pnode.mli} (100%) diff --git a/lib/pnode/node.ml b/lib/pnode/pnode.ml similarity index 100% rename from lib/pnode/node.ml rename to lib/pnode/pnode.ml diff --git a/lib/pnode/node.mli b/lib/pnode/pnode.mli similarity index 100% rename from lib/pnode/node.mli rename to lib/pnode/pnode.mli diff --git a/test/disseminator_tests.ml b/test/disseminator_tests.ml index 57e1f31..390c865 100644 --- a/test/disseminator_tests.ml +++ b/test/disseminator_tests.ml @@ -7,12 +7,12 @@ module Disseminator_tests = struct let node = Lwt_main.run (let%lwt node_a = - Node.init ~init_peers:[] Address.{ address = "127.0.0.1"; port = 5000 } - in + Pnode.init ~init_peers:[] + Address.{ address = "127.0.0.1"; port = 5000 } in Lwt.return node_a) let queue_insertion_test () = - let _server = Node.run_server ~msg_handler:Commons.msg_handler node in + let _server = Pnode.run_server ~msg_handler:Commons.msg_handler node in let payload = Client.address_of !node |> (fun Address.{ port; _ } -> port) @@ -20,10 +20,10 @@ module Disseminator_tests = struct |> String.to_bytes in Client.create_post node payload |> Client.post node; - Lwt.return (List.length (Node.Testing.broadcast_queue node)) + Lwt.return (List.length (Pnode.Testing.broadcast_queue node)) let queue_removal_test () = - let _server = Node.run_server ~msg_handler:Commons.msg_handler node in + let _server = Pnode.run_server ~msg_handler:Commons.msg_handler node in let payload = Client.address_of !node |> (fun Address.{ port; _ } -> port) @@ -32,13 +32,13 @@ module Disseminator_tests = struct Client.create_post node payload |> Client.post node; let%lwt () = - while%lwt Node.Testing.disseminator_round node <= 10 do + while%lwt Pnode.Testing.disseminator_round node <= 10 do Lwt_unix.sleep 0.1 done in - Lwt.return (List.length (Node.Testing.broadcast_queue node)) + Lwt.return (List.length (Pnode.Testing.broadcast_queue node)) let seen_message_test () = - let _server = Node.run_server ~msg_handler:Commons.msg_handler node in + let _server = Pnode.run_server ~msg_handler:Commons.msg_handler node in let payload = Client.address_of !node |> (fun Address.{ port; _ } -> port) @@ -47,7 +47,7 @@ module Disseminator_tests = struct let message = Client.create_post node payload in message |> Client.post node; - Lwt.return (Node.seen node message) + Lwt.return (Pnode.seen node message) end (** Test for dissemination given a specific node. *) diff --git a/test/failure_detector_prop.ml b/test/failure_detector_prop.ml index ff45a9f..1197263 100644 --- a/test/failure_detector_prop.ml +++ b/test/failure_detector_prop.ml @@ -1,10 +1,11 @@ open QCheck2.Gen open Pollinate.Peer open Pollinate -module SUT = Pollinate.PNode.Testing.Failure_detector +module SUT = Pollinate.PNode.Pnode.Testing.Failure_detector let node_a = - Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3002 }) + Lwt_main.run + (Pollinate.PNode.Pnode.init Address.{ address = "127.0.0.1"; port = 3002 }) let update_peer = QCheck2.Test.make ~count:1000 diff --git a/test/failure_detector_tests.ml b/test/failure_detector_tests.ml index 53d91f6..490b1ef 100644 --- a/test/failure_detector_tests.ml +++ b/test/failure_detector_tests.ml @@ -1,13 +1,13 @@ open Pollinate open Pollinate.PNode open Lwt.Infix -module SUT = Pollinate.PNode.Testing.Failure_detector +module SUT = Pollinate.PNode.Pnode.Testing.Failure_detector let node_a = - Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3003 }) + Lwt_main.run (Pnode.init Address.{ address = "127.0.0.1"; port = 3003 }) let node_b = - Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3004 }) + Lwt_main.run (Pnode.init Address.{ address = "127.0.0.1"; port = 3004 }) let peer_b = Client.peer_from !node_b diff --git a/test/gossip_tests.ml b/test/gossip_tests.ml index 0074ce3..74c8346 100644 --- a/test/gossip_tests.ml +++ b/test/gossip_tests.ml @@ -41,16 +41,19 @@ module Gossip_tests = struct local_address 4009 ) in let%lwt node_a = - Node.init ~init_peers:[addr_b; addr_c; addr_e; addr_h] addr_a in - let%lwt node_b = Node.init ~init_peers:[addr_a; addr_d; addr_e] addr_b in - let%lwt node_c = Node.init ~init_peers:[addr_a; addr_f; addr_g] addr_c in - let%lwt node_d = Node.init ~init_peers:[addr_b] addr_d in - let%lwt node_e = Node.init ~init_peers:[addr_a; addr_b] addr_e in - let%lwt node_f = Node.init ~init_peers:[addr_c] addr_f in - let%lwt node_g = Node.init ~init_peers:[addr_c] addr_g in - let%lwt node_h = Node.init ~init_peers:[addr_a; addr_i; addr_j] addr_h in - let%lwt node_i = Node.init ~init_peers:[addr_h] addr_i in - let%lwt node_j = Node.init ~init_peers:[addr_h] addr_j in + Pnode.init ~init_peers:[addr_b; addr_c; addr_e; addr_h] addr_a in + let%lwt node_b = + Pnode.init ~init_peers:[addr_a; addr_d; addr_e] addr_b in + let%lwt node_c = + Pnode.init ~init_peers:[addr_a; addr_f; addr_g] addr_c in + let%lwt node_d = Pnode.init ~init_peers:[addr_b] addr_d in + let%lwt node_e = Pnode.init ~init_peers:[addr_a; addr_b] addr_e in + let%lwt node_f = Pnode.init ~init_peers:[addr_c] addr_f in + let%lwt node_g = Pnode.init ~init_peers:[addr_c] addr_g in + let%lwt node_h = + Pnode.init ~init_peers:[addr_a; addr_i; addr_j] addr_h in + let%lwt node_i = Pnode.init ~init_peers:[addr_h] addr_i in + let%lwt node_j = Pnode.init ~init_peers:[addr_h] addr_j in Lwt.return ( node_a, node_b, @@ -96,7 +99,7 @@ module Gossip_tests = struct (* Start the server for each node in a thread *) let _ = List.map - (Node.run_server ~preprocessor:Commons.preprocessor + (Pnode.run_server ~preprocessor:Commons.preprocessor ~msg_handler:Commons.msg_handler) nodes in @@ -121,7 +124,7 @@ module Gossip_tests = struct (* Compute the list of nodes who have seen the post. *) let list_of_seen = Hashtbl.find !node.acknowledgments (Message.hash_of message) - |> Testing.AddressSet.to_seq + |> Pnode.Testing.AddressSet.to_seq |> Seq.map (fun address -> address.Address.port) |> List.of_seq in diff --git a/test/networking_prop.ml b/test/networking_prop.ml index 660f8fd..6c93bde 100644 --- a/test/networking_prop.ml +++ b/test/networking_prop.ml @@ -1,10 +1,11 @@ open QCheck2.Gen open Pollinate.Peer open Pollinate -module SUT = Pollinate.PNode.Testing.Networking +module SUT = Pollinate.PNode.Pnode.Testing.Networking let node_a = - Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 2002 }) + Lwt_main.run + (Pollinate.PNode.Pnode.init Address.{ address = "127.0.0.1"; port = 2002 }) let knuth_shuffle_size = QCheck2.Test.make ~count:1000 diff --git a/test/node_tests.ml b/test/node_tests.ml index 0466474..3b8436b 100644 --- a/test/node_tests.ml +++ b/test/node_tests.ml @@ -8,12 +8,12 @@ open Messages module Node_tests = struct (* Initializes two nodes and the related two peers *) let node_a = - Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3000 }) + Lwt_main.run (Pnode.init Address.{ address = "127.0.0.1"; port = 3000 }) let peer_a = Client.peer_from !node_a let node_b = - Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3001 }) + Lwt_main.run (Pnode.init Address.{ address = "127.0.0.1"; port = 3001 }) let peer_b = Client.peer_from !node_b @@ -23,7 +23,7 @@ module Node_tests = struct let open Messages in let _ = Lwt_list.map_p - (Node.run_server ~preprocessor:Commons.preprocessor + (Pnode.run_server ~preprocessor:Commons.preprocessor ~msg_handler:Commons.msg_handler) [node_a; node_b] in let get = Encoding.pack bin_writer_message (Request Get) in @@ -47,7 +47,7 @@ module Node_tests = struct let open Messages in let _ = Lwt_list.map_p - (Node.run_server ~preprocessor:Commons.preprocessor + (Pnode.run_server ~preprocessor:Commons.preprocessor ~msg_handler:Commons.msg_handler) [node_a; node_b] in let ping = Encoding.pack bin_writer_message (Request Ping) in From 219aeb798fc7db5aaa0dac5133121601338a0675 Mon Sep 17 00:00:00 2001 From: Gauthier SEBILLE Date: Thu, 7 Jul 2022 12:39:04 +0200 Subject: [PATCH 11/17] fix the fix --- lib/pnode/dune | 2 +- lib/pollinate.ml | 2 +- test/failure_detector_prop.ml | 7 +++---- test/failure_detector_tests.ml | 6 +++--- test/networking_prop.ml | 5 ++--- 5 files changed, 10 insertions(+), 12 deletions(-) diff --git a/lib/pnode/dune b/lib/pnode/dune index 564f9a3..ed5e578 100644 --- a/lib/pnode/dune +++ b/lib/pnode/dune @@ -1,5 +1,5 @@ (library - (name node) + (name pnode) (public_name pollinate.pnode) (libraries bin_prot lwt lwt.unix pollinate.common) (preprocess diff --git a/lib/pollinate.ml b/lib/pollinate.ml index 2d02d90..ab4d85a 100644 --- a/lib/pollinate.ml +++ b/lib/pollinate.ml @@ -1,4 +1,4 @@ module Address = Common.Address module Util = Common.Util module Peer = Common.Peer -module PNode = Node +module PNode = Pnode diff --git a/test/failure_detector_prop.ml b/test/failure_detector_prop.ml index 1197263..eddfde0 100644 --- a/test/failure_detector_prop.ml +++ b/test/failure_detector_prop.ml @@ -1,18 +1,17 @@ open QCheck2.Gen open Pollinate.Peer open Pollinate -module SUT = Pollinate.PNode.Pnode.Testing.Failure_detector +module SUT = Pollinate.PNode.Testing.Failure_detector let node_a = - Lwt_main.run - (Pollinate.PNode.Pnode.init Address.{ address = "127.0.0.1"; port = 3002 }) + Lwt_main.run (PNode.init Address.{ address = "127.0.0.1"; port = 3002 }) let update_peer = QCheck2.Test.make ~count:1000 ~name:"update_neighbor_status successfully update neighbor status" (pair Generators.peer_gen Generators.peer_status_gen) (fun (neighbor, neighbor_status) -> - let _ = add_neighbor (Pollinate.PNode.Client.peer_from !node_a) neighbor in + let _ = add_neighbor (PNode.Client.peer_from !node_a) neighbor in let _ = SUT.update_peer_status node_a neighbor neighbor_status in neighbor.status = neighbor_status) diff --git a/test/failure_detector_tests.ml b/test/failure_detector_tests.ml index 490b1ef..45c7597 100644 --- a/test/failure_detector_tests.ml +++ b/test/failure_detector_tests.ml @@ -1,7 +1,7 @@ open Pollinate open Pollinate.PNode open Lwt.Infix -module SUT = Pollinate.PNode.Pnode.Testing.Failure_detector +module SUT = Pollinate.PNode.Testing.Failure_detector let node_a = Lwt_main.run (Pnode.init Address.{ address = "127.0.0.1"; port = 3003 }) @@ -15,7 +15,7 @@ let failure_detection () = let open Common.Peer in let open Client in let _ = add_peer_as_is !node_a peer_b in - let _ = Pollinate.PNode.Client.peer_from !node_a in + let _ = PNode.Client.peer_from !node_a in let _ = SUT.update_peer_status node_a peer_b Suspicious in (* Need to wait for the timeout to be reached An other way to do, would be to change the `last_suspicious_status` of the peer *) let%lwt _ = Lwt_unix.sleep 9.1 in @@ -24,7 +24,7 @@ let failure_detection () = let failure_detection_nothing_on_alive () = let open Common.Peer in - let _ = add_neighbor (Pollinate.PNode.Client.peer_from !node_a) peer_b in + let _ = add_neighbor (PNode.Client.peer_from !node_a) peer_b in let _ = SUT.update_peer_status node_a peer_b Alive in let%lwt _ = SUT.failure_detection node_a in Lwt.return (Base.Hashtbl.length !node_a.peers = 1) diff --git a/test/networking_prop.ml b/test/networking_prop.ml index 6c93bde..48c4cd1 100644 --- a/test/networking_prop.ml +++ b/test/networking_prop.ml @@ -1,11 +1,10 @@ open QCheck2.Gen open Pollinate.Peer open Pollinate -module SUT = Pollinate.PNode.Pnode.Testing.Networking +module SUT = Pollinate.PNode.Testing.Networking let node_a = - Lwt_main.run - (Pollinate.PNode.Pnode.init Address.{ address = "127.0.0.1"; port = 2002 }) + Lwt_main.run (PNode.init Address.{ address = "127.0.0.1"; port = 2002 }) let knuth_shuffle_size = QCheck2.Test.make ~count:1000 From c57755e843b8e119e97b8313e92489e4749dc914 Mon Sep 17 00:00:00 2001 From: Gauthier SEBILLE Date: Fri, 8 Jul 2022 09:00:26 +0200 Subject: [PATCH 12/17] last review --- lib/pnode/server.ml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/pnode/server.ml b/lib/pnode/server.ml index 14fa016..619d7bd 100644 --- a/lib/pnode/server.ml +++ b/lib/pnode/server.ml @@ -53,10 +53,9 @@ let process_message node preprocessor !node.address.address !node.address.port message.sender.address message.sender.port) in *) match msg_handler message with - | Some msg -> + | Some Message.{ data; signature = payload_signature } -> (* let _ = Printf.sprintf "I am inside msg_handler, found payload\n%!" in *) - Client.create_response node message ?payload_signature:msg.signature - msg.data + Client.create_response node message ?payload_signature data |> Networking.send_to node | None -> failwith "received request without payload nor payload_signature") From 17848cf15001dc1362d4689b14e8cd05839e88aa Mon Sep 17 00:00:00 2001 From: Gauthier SEBILLE Date: Fri, 8 Jul 2022 15:25:23 +0200 Subject: [PATCH 13/17] rework category and sub_category --- lib/pnode/client.ml | 24 ++++++++++++------------ lib/pnode/client.mli | 6 +++--- lib/pnode/failure_detector.ml | 4 ++-- lib/pnode/message.ml | 12 +++++++++--- lib/pnode/message.mli | 12 +++++++++--- lib/pnode/server.ml | 2 +- test/commons.ml | 4 ++-- 7 files changed, 38 insertions(+), 26 deletions(-) diff --git a/lib/pnode/client.ml b/lib/pnode/client.ml index fdd87c3..c5a228e 100644 --- a/lib/pnode/client.ml +++ b/lib/pnode/client.ml @@ -22,14 +22,14 @@ let add_peer_as_is node (peer : Peer.t) = let peers node = Base.Hashtbl.keys node.peers let create_request node ?(request_ack = false) recipient ?payload_signature - ?sub_category payload = + ?category payload = Mutex.with_lock !node.current_request_id (fun id -> id := !id + 1; Lwt.return Message. { - category = Message.Request; - sub_category; + pollinate_category = Message.Request; + category; request_ack; id = !id; timestamp = Unix.gettimeofday (); @@ -39,11 +39,11 @@ let create_request node ?(request_ack = false) recipient ?payload_signature }) let create_response node ?(request_ack = false) request ?payload_signature - ?sub_category payload = + ?category payload = Message. { - category = Message.Response; - sub_category; + pollinate_category = Message.Response; + category; request_ack; id = request.id; timestamp = Unix.gettimeofday (); @@ -52,14 +52,14 @@ let create_response node ?(request_ack = false) request ?payload_signature payload = { data = payload; signature = payload_signature }; } -let create_post node ?(request_ack = false) ?payload_signature ?sub_category - payload = +let create_post node ?(request_ack = false) ?payload_signature ?category payload + = Message. { - category = Message.Post; + pollinate_category = Message.Post; request_ack; id = -1; - sub_category; + category; timestamp = Unix.gettimeofday (); sender = !node.address; recipients = []; @@ -69,8 +69,8 @@ let create_post node ?(request_ack = false) ?payload_signature ?sub_category let create_ack node incoming_message = Message. { - category = Message.Acknowledgment; - sub_category = None; + pollinate_category = Message.Acknowledgment; + category = None; request_ack = false; id = -1; timestamp = Unix.gettimeofday (); diff --git a/lib/pnode/client.mli b/lib/pnode/client.mli index ef1f20a..3eb465d 100644 --- a/lib/pnode/client.mli +++ b/lib/pnode/client.mli @@ -31,7 +31,7 @@ val create_request : ?request_ack:bool -> Address.t -> ?payload_signature:bytes -> - ?sub_category:string * string -> + ?category:Message.category -> bytes -> Message.t Lwt.t @@ -42,7 +42,7 @@ val create_response : ?request_ack:bool -> Message.t -> ?payload_signature:bytes -> - ?sub_category:string * string -> + ?category:Message.category -> bytes -> Message.t @@ -59,7 +59,7 @@ val create_post : node ref -> ?request_ack:bool -> ?payload_signature:bytes -> - ?sub_category:string * string -> + ?category:Message.category -> bytes -> Message.t diff --git a/lib/pnode/failure_detector.ml b/lib/pnode/failure_detector.ml index 29cfcea..315f0ec 100644 --- a/lib/pnode/failure_detector.ml +++ b/lib/pnode/failure_detector.ml @@ -57,9 +57,9 @@ let update_peer_status node peer status = let create_message node message recipient = Message. { - category = Failure_detection; + pollinate_category = Failure_detection; request_ack = false; - sub_category = None; + category = None; id = -1; timestamp = Unix.gettimeofday (); sender = Client.address_of !node; diff --git a/lib/pnode/message.ml b/lib/pnode/message.ml index 89b3c0d..5643453 100644 --- a/lib/pnode/message.ml +++ b/lib/pnode/message.ml @@ -1,7 +1,7 @@ open Common open Bin_prot.Std -type category = +type pollinate_category = | Uncategorized | Acknowledgment | Request @@ -17,9 +17,15 @@ type payload = { } [@@deriving bin_io] +type category = { + category : string; + sub_category : string option; +} +[@@deriving bin_io] + type t = { - category : category; - sub_category : (string * string) option; + pollinate_category : pollinate_category; + category : category option; request_ack : bool; id : int; timestamp : float; diff --git a/lib/pnode/message.mli b/lib/pnode/message.mli index 21c963a..631b8fd 100644 --- a/lib/pnode/message.mli +++ b/lib/pnode/message.mli @@ -10,7 +10,7 @@ open Common (** Messages are {i requests} or {i responses}, determining how they are stored and where they are handled. *) -type category = +type pollinate_category = | Uncategorized | Acknowledgment | Request @@ -26,11 +26,17 @@ type payload = { } [@@deriving bin_io] +type category = { + category : string; + sub_category : string option; +} +[@@deriving bin_io] + (** Messages received from [peers] which are processed by the node's message handler. *) type t = { - category : category; - sub_category : (string * string) option; + pollinate_category : pollinate_category; + category : category option; request_ack : bool; id : int; timestamp : float; diff --git a/lib/pnode/server.ml b/lib/pnode/server.ml index 619d7bd..b4e58ea 100644 --- a/lib/pnode/server.ml +++ b/lib/pnode/server.ml @@ -44,7 +44,7 @@ let process_message node preprocessor (Printf.sprintf "Processing message %s from %d...\n" (Message.hash_of message) message.sender.port) in *) let%lwt () = - match message.category with + match message.pollinate_category with | Response -> Lwt.return (handle_response node message) | Request -> ( (* let%lwt () = diff --git a/test/commons.ml b/test/commons.ml index 03083bb..d47eb89 100644 --- a/test/commons.ml +++ b/test/commons.ml @@ -7,7 +7,7 @@ module Commons = struct let preprocessor msg = let open Messages in - match msg.Message.category with + match msg.Message.pollinate_category with | Request -> let[@warning "-8"] (Request r) = Encoding.unpack bin_read_message msg.Message.payload.data in @@ -28,7 +28,7 @@ module Commons = struct let msg_handler message = let open Messages in - match message.category with + match message.pollinate_category with | Request -> let request = Encoding.unpack bin_read_request message.payload.data in let response = From e3e337dee049ce09523090b1163d1eb9b0140b04 Mon Sep 17 00:00:00 2001 From: Gauthier SEBILLE Date: Fri, 8 Jul 2022 15:35:00 +0200 Subject: [PATCH 14/17] rename to operation --- lib/pnode/client.ml | 16 ++++++++-------- lib/pnode/client.mli | 6 +++--- lib/pnode/failure_detector.ml | 2 +- lib/pnode/message.ml | 6 +++--- lib/pnode/message.mli | 6 +++--- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/lib/pnode/client.ml b/lib/pnode/client.ml index c5a228e..642c10c 100644 --- a/lib/pnode/client.ml +++ b/lib/pnode/client.ml @@ -22,14 +22,14 @@ let add_peer_as_is node (peer : Peer.t) = let peers node = Base.Hashtbl.keys node.peers let create_request node ?(request_ack = false) recipient ?payload_signature - ?category payload = + ?operation payload = Mutex.with_lock !node.current_request_id (fun id -> id := !id + 1; Lwt.return Message. { pollinate_category = Message.Request; - category; + operation; request_ack; id = !id; timestamp = Unix.gettimeofday (); @@ -39,11 +39,11 @@ let create_request node ?(request_ack = false) recipient ?payload_signature }) let create_response node ?(request_ack = false) request ?payload_signature - ?category payload = + ?operation payload = Message. { pollinate_category = Message.Response; - category; + operation; request_ack; id = request.id; timestamp = Unix.gettimeofday (); @@ -52,14 +52,14 @@ let create_response node ?(request_ack = false) request ?payload_signature payload = { data = payload; signature = payload_signature }; } -let create_post node ?(request_ack = false) ?payload_signature ?category payload - = +let create_post node ?(request_ack = false) ?payload_signature ?operation + payload = Message. { pollinate_category = Message.Post; request_ack; id = -1; - category; + operation; timestamp = Unix.gettimeofday (); sender = !node.address; recipients = []; @@ -70,7 +70,7 @@ let create_ack node incoming_message = Message. { pollinate_category = Message.Acknowledgment; - category = None; + operation = None; request_ack = false; id = -1; timestamp = Unix.gettimeofday (); diff --git a/lib/pnode/client.mli b/lib/pnode/client.mli index 3eb465d..50c2a43 100644 --- a/lib/pnode/client.mli +++ b/lib/pnode/client.mli @@ -31,7 +31,7 @@ val create_request : ?request_ack:bool -> Address.t -> ?payload_signature:bytes -> - ?category:Message.category -> + ?operation:Message.operation -> bytes -> Message.t Lwt.t @@ -42,7 +42,7 @@ val create_response : ?request_ack:bool -> Message.t -> ?payload_signature:bytes -> - ?category:Message.category -> + ?operation:Message.operation -> bytes -> Message.t @@ -59,7 +59,7 @@ val create_post : node ref -> ?request_ack:bool -> ?payload_signature:bytes -> - ?category:Message.category -> + ?operation:Message.operation -> bytes -> Message.t diff --git a/lib/pnode/failure_detector.ml b/lib/pnode/failure_detector.ml index 315f0ec..f382057 100644 --- a/lib/pnode/failure_detector.ml +++ b/lib/pnode/failure_detector.ml @@ -59,7 +59,7 @@ let create_message node message recipient = { pollinate_category = Failure_detection; request_ack = false; - category = None; + operation = None; id = -1; timestamp = Unix.gettimeofday (); sender = Client.address_of !node; diff --git a/lib/pnode/message.ml b/lib/pnode/message.ml index 5643453..0b18f51 100644 --- a/lib/pnode/message.ml +++ b/lib/pnode/message.ml @@ -17,15 +17,15 @@ type payload = { } [@@deriving bin_io] -type category = { +type operation = { category : string; - sub_category : string option; + name : string option; } [@@deriving bin_io] type t = { pollinate_category : pollinate_category; - category : category option; + operation : operation option; request_ack : bool; id : int; timestamp : float; diff --git a/lib/pnode/message.mli b/lib/pnode/message.mli index 631b8fd..3954db9 100644 --- a/lib/pnode/message.mli +++ b/lib/pnode/message.mli @@ -26,9 +26,9 @@ type payload = { } [@@deriving bin_io] -type category = { +type operation = { category : string; - sub_category : string option; + name : string option; } [@@deriving bin_io] @@ -36,7 +36,7 @@ type category = { processed by the node's message handler. *) type t = { pollinate_category : pollinate_category; - category : category option; + operation : operation option; request_ack : bool; id : int; timestamp : float; From d3c1e0101e82d9c708a977ac89efec10a40c1c28 Mon Sep 17 00:00:00 2001 From: Gauthier SEBILLE Date: Fri, 8 Jul 2022 16:16:21 +0200 Subject: [PATCH 15/17] operation to bytes for performance --- lib/pnode/message.ml | 4 ++-- lib/pnode/message.mli | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/pnode/message.ml b/lib/pnode/message.ml index 0b18f51..14abc73 100644 --- a/lib/pnode/message.ml +++ b/lib/pnode/message.ml @@ -18,8 +18,8 @@ type payload = { [@@deriving bin_io] type operation = { - category : string; - name : string option; + category : bytes; + name : bytes option; } [@@deriving bin_io] diff --git a/lib/pnode/message.mli b/lib/pnode/message.mli index 3954db9..de252de 100644 --- a/lib/pnode/message.mli +++ b/lib/pnode/message.mli @@ -27,8 +27,8 @@ type payload = { [@@deriving bin_io] type operation = { - category : string; - name : string option; + category : bytes; + name : bytes option; } [@@deriving bin_io] From f7a30dd363b5049aa4997e9068316dcbe05debe8 Mon Sep 17 00:00:00 2001 From: Gauthier SEBILLE Date: Mon, 11 Jul 2022 10:01:42 +0200 Subject: [PATCH 16/17] total rework the payload to make it generic --- lib/pnode/client.ml | 25 +++++++------------------ lib/pnode/client.mli | 24 +++--------------------- lib/pnode/failure_detector.ml | 6 ++---- lib/pnode/message.ml | 17 ++--------------- lib/pnode/message.mli | 15 +-------------- lib/pnode/pnode.mli | 2 +- lib/pnode/server.ml | 14 ++++++-------- lib/pnode/server.mli | 2 +- test/commons.ml | 25 ++++++------------------- test/node_tests.ml | 6 +++--- 10 files changed, 32 insertions(+), 104 deletions(-) diff --git a/lib/pnode/client.ml b/lib/pnode/client.ml index 642c10c..787ad48 100644 --- a/lib/pnode/client.ml +++ b/lib/pnode/client.ml @@ -21,66 +21,55 @@ let add_peer_as_is node (peer : Peer.t) = let peers node = Base.Hashtbl.keys node.peers -let create_request node ?(request_ack = false) recipient ?payload_signature - ?operation payload = +let create_request node ?(request_ack = false) recipient payload = Mutex.with_lock !node.current_request_id (fun id -> id := !id + 1; Lwt.return Message. { pollinate_category = Message.Request; - operation; request_ack; id = !id; timestamp = Unix.gettimeofday (); sender = !node.address; recipients = [recipient]; - payload = { data = payload; signature = payload_signature }; + payload; }) -let create_response node ?(request_ack = false) request ?payload_signature - ?operation payload = +let create_response node ?(request_ack = false) request payload = Message. { pollinate_category = Message.Response; - operation; request_ack; id = request.id; timestamp = Unix.gettimeofday (); sender = !node.address; recipients = [request.sender]; - payload = { data = payload; signature = payload_signature }; + payload; } -let create_post node ?(request_ack = false) ?payload_signature ?operation - payload = +let create_post node ?(request_ack = false) payload = Message. { pollinate_category = Message.Post; request_ack; id = -1; - operation; timestamp = Unix.gettimeofday (); sender = !node.address; recipients = []; - payload = { data = payload; signature = payload_signature }; + payload; } let create_ack node incoming_message = Message. { pollinate_category = Message.Acknowledgment; - operation = None; request_ack = false; id = -1; timestamp = Unix.gettimeofday (); sender = !node.address; recipients = [incoming_message.sender]; - payload = - { - data = incoming_message |> Message.hash_of |> Bytes.of_string; - signature = None; - }; + payload = incoming_message |> Message.hash_of |> Bytes.of_string; } let request node message = diff --git a/lib/pnode/client.mli b/lib/pnode/client.mli index 50c2a43..11b726a 100644 --- a/lib/pnode/client.mli +++ b/lib/pnode/client.mli @@ -27,24 +27,12 @@ val post : node ref -> Message.t -> unit (** [create_request node recipient payload] creates a [Message.t] of the {i Request category} addressed to {i recipient} containing {i payload}. *) val create_request : - node ref -> - ?request_ack:bool -> - Address.t -> - ?payload_signature:bytes -> - ?operation:Message.operation -> - bytes -> - Message.t Lwt.t + node ref -> ?request_ack:bool -> Address.t -> bytes -> Message.t Lwt.t (** [create_response node request payload] creates a [Message.t] of the {i Response category} that responds to {i request} whose content is {i payload}. *) val create_response : - node ref -> - ?request_ack:bool -> - Message.t -> - ?payload_signature:bytes -> - ?operation:Message.operation -> - bytes -> - Message.t + node ref -> ?request_ack:bool -> Message.t -> bytes -> Message.t (** Sends an encoded {i request} to the specified peer and returns a promise holding the response from the peer. This @@ -55,13 +43,7 @@ val request : node ref -> Message.t -> Message.t Lwt.t (** [create_post node payload] creates a [Message.t] of the {i Post category} containing {i payload} for eventual gossip dissemination across the entire network. *) -val create_post : - node ref -> - ?request_ack:bool -> - ?payload_signature:bytes -> - ?operation:Message.operation -> - bytes -> - Message.t +val create_post : node ref -> ?request_ack:bool -> bytes -> Message.t val create_ack : node ref -> Message.t -> Message.t diff --git a/lib/pnode/failure_detector.ml b/lib/pnode/failure_detector.ml index f382057..cb9fd6f 100644 --- a/lib/pnode/failure_detector.ml +++ b/lib/pnode/failure_detector.ml @@ -59,13 +59,11 @@ let create_message node message recipient = { pollinate_category = Failure_detection; request_ack = false; - operation = None; id = -1; timestamp = Unix.gettimeofday (); sender = Client.address_of !node; recipients = [recipient.Peer.address]; - payload = - { data = Encoding.pack bin_writer_message message; signature = None }; + payload = Encoding.pack bin_writer_message message; } let send_message message node recipient = @@ -82,7 +80,7 @@ let send_ping_request_to node (recipient : Peer.t) = let handle_message node message = let open Message in let sender = Peer.from message.sender in - let msg = Encoding.unpack bin_read_message message.payload.data in + let msg = Encoding.unpack bin_read_message message.payload in let t = !node.failure_detector in match msg with | Ping -> send_acknowledge_to node sender diff --git a/lib/pnode/message.ml b/lib/pnode/message.ml index 14abc73..1f5c722 100644 --- a/lib/pnode/message.ml +++ b/lib/pnode/message.ml @@ -11,27 +11,14 @@ type pollinate_category = | Custom of string [@@deriving bin_io, show] -type payload = { - data : bytes; - signature : bytes option; -} -[@@deriving bin_io] - -type operation = { - category : bytes; - name : bytes option; -} -[@@deriving bin_io] - type t = { pollinate_category : pollinate_category; - operation : operation option; request_ack : bool; id : int; timestamp : float; sender : Address.t; recipients : Address.t list; - payload : payload; + payload : bytes; } [@@deriving bin_io] @@ -40,7 +27,7 @@ let hash_of m = m.sender.address; string_of_int m.sender.port; string_of_float m.timestamp; - Bytes.to_string m.payload.data; + Bytes.to_string m.payload; ] |> String.concat "" |> Digest.string diff --git a/lib/pnode/message.mli b/lib/pnode/message.mli index de252de..15efc58 100644 --- a/lib/pnode/message.mli +++ b/lib/pnode/message.mli @@ -20,29 +20,16 @@ type pollinate_category = | Custom of string [@@deriving bin_io, show] -type payload = { - data : bytes; - signature : bytes option; -} -[@@deriving bin_io] - -type operation = { - category : bytes; - name : bytes option; -} -[@@deriving bin_io] - (** Messages received from [peers] which are processed by the node's message handler. *) type t = { pollinate_category : pollinate_category; - operation : operation option; request_ack : bool; id : int; timestamp : float; sender : Address.t; recipients : Address.t list; - payload : payload; + payload : bytes; } [@@deriving bin_io] diff --git a/lib/pnode/pnode.mli b/lib/pnode/pnode.mli index b731ce5..87ccd3d 100644 --- a/lib/pnode/pnode.mli +++ b/lib/pnode/pnode.mli @@ -14,7 +14,7 @@ val init : ?init_peers:Address.t list -> Address.t -> t ref Lwt.t val run_server : ?preprocessor:(Message.t -> Message.t) -> - msg_handler:(Message.t -> Message.payload option) -> + msg_handler:(Message.t -> bytes option) -> t ref -> 'b Lwt.t diff --git a/lib/pnode/server.ml b/lib/pnode/server.ml index b4e58ea..d23efa0 100644 --- a/lib/pnode/server.ml +++ b/lib/pnode/server.ml @@ -33,8 +33,8 @@ let handle_ack node msg = Otherwise, we just apply the message handler and that's it. *) -let process_message node preprocessor - (msg_handler : Message.t -> Message.payload option) = +let process_message node preprocessor (msg_handler : Message.t -> bytes option) + = let open Message in let%lwt message = Networking.recv_next node in let message = preprocessor message in @@ -53,14 +53,13 @@ let process_message node preprocessor !node.address.address !node.address.port message.sender.address message.sender.port) in *) match msg_handler message with - | Some Message.{ data; signature = payload_signature } -> + | Some payload -> (* let _ = Printf.sprintf "I am inside msg_handler, found payload\n%!" in *) - Client.create_response node message ?payload_signature data - |> Networking.send_to node + Client.create_response node message payload |> Networking.send_to node | None -> failwith "received request without payload nor payload_signature") | Acknowledgment -> - let msg_hash = Bytes.to_string message.payload.data in + let msg_hash = Bytes.to_string message.payload in let new_addrs = match Hashtbl.find_opt !node.acknowledgments msg_hash with | Some addrs -> AddressSet.add message.sender addrs @@ -130,8 +129,7 @@ let _print_logs node = 3. Run the disseminator, this includes actually sending messages to be disseminated across the network. 4. Wait 0.001 seconds before restarting the procedure. *) -let rec run node preprocessor - (msg_handler : Message.t -> Message.payload option) = +let rec run node preprocessor (msg_handler : Message.t -> bytes option) = (* Step 0 *) (* let%lwt () = print_logs node in *) (* Step 1 *) diff --git a/lib/pnode/server.mli b/lib/pnode/server.mli index 9bac75b..2cfa48c 100644 --- a/lib/pnode/server.mli +++ b/lib/pnode/server.mli @@ -4,5 +4,5 @@ val run : Types.node ref -> (Message.t -> Message.t) -> - (Message.t -> Message.payload option) -> + (Message.t -> bytes option) -> 'b Lwt.t diff --git a/test/commons.ml b/test/commons.ml index d47eb89..3b55d88 100644 --- a/test/commons.ml +++ b/test/commons.ml @@ -10,38 +10,25 @@ module Commons = struct match msg.Message.pollinate_category with | Request -> let[@warning "-8"] (Request r) = - Encoding.unpack bin_read_message msg.Message.payload.data in - { - msg with - payload = - { data = Encoding.pack bin_writer_request r; signature = None }; - } + Encoding.unpack bin_read_message msg.Message.payload in + { msg with payload = Encoding.pack bin_writer_request r } | Response -> let[@warning "-8"] (Response r) = - Encoding.unpack bin_read_message msg.Message.payload.data in - { - msg with - payload = - { data = Encoding.pack bin_writer_response r; signature = None }; - } + Encoding.unpack bin_read_message msg.Message.payload in + { msg with payload = Encoding.pack bin_writer_response r } | _ -> msg let msg_handler message = let open Messages in match message.pollinate_category with | Request -> - let request = Encoding.unpack bin_read_request message.payload.data in + let request = Encoding.unpack bin_read_request message.payload in let response = match request with | Ping -> Pong | Get -> Pong | Insert _ -> Success "Successfully added value to state" in - let msg = - Message. - { - data = Response response |> Encoding.pack bin_writer_message; - signature = None; - } in + let msg = Response response |> Encoding.pack bin_writer_message in Some msg | Post -> None | _ -> failwith "unhandled in tests" diff --git a/test/node_tests.ml b/test/node_tests.ml index 3b8436b..07ca0ec 100644 --- a/test/node_tests.ml +++ b/test/node_tests.ml @@ -30,11 +30,11 @@ module Node_tests = struct let%lwt message = Client.create_request node_a peer_b.address get in let%lwt { payload = res_from_b; _ } = Client.request node_a message in - let res_from_b = Encoding.unpack bin_read_response res_from_b.data in + let res_from_b = Encoding.unpack bin_read_response res_from_b in let%lwt message = Client.create_request node_b peer_a.address get in let%lwt { payload = res_from_a; _ } = Client.request node_b message in - let res_from_a = Encoding.unpack bin_read_response res_from_a.data in + let res_from_a = Encoding.unpack bin_read_response res_from_a in let res_from_b, res_from_a = match (res_from_b, res_from_a) with @@ -54,7 +54,7 @@ module Node_tests = struct let%lwt message = Client.create_request node_a peer_b.address ping in let%lwt { payload = pong; _ } = Client.request node_a message in - let pong = Encoding.unpack bin_read_response pong.data in + let pong = Encoding.unpack bin_read_response pong in let pong = match pong with From 344f9d831f8532b560660d71495abf00e8a59b27 Mon Sep 17 00:00:00 2001 From: Gauthier SEBILLE Date: Mon, 11 Jul 2022 11:15:20 +0200 Subject: [PATCH 17/17] comment log in tests --- test/gossip_tests.ml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/test/gossip_tests.ml b/test/gossip_tests.ml index 74c8346..1af3ebb 100644 --- a/test/gossip_tests.ml +++ b/test/gossip_tests.ml @@ -129,14 +129,14 @@ module Gossip_tests = struct |> List.of_seq in (* Write the length of list_of_seen to a tmp log file *) - let%lwt () = - let%lwt oc = - Lwt_io.open_file - ~flags:[Unix.O_WRONLY; Unix.O_APPEND; Unix.O_CREAT] - ~mode:Lwt_io.Output "/tmp/log.txt" in - let%lwt () = - Lwt_io.write oc (Printf.sprintf "%d\n" (List.length list_of_seen)) in - Lwt_io.close oc in + (* let%lwt () = + let%lwt oc = + Lwt_io.open_file + ~flags:[Unix.O_WRONLY; Unix.O_APPEND; Unix.O_CREAT] + ~mode:Lwt_io.Output "/tmp/log.txt" in + let%lwt () = + Lwt_io.write oc (Printf.sprintf "%d\n" (List.length list_of_seen)) in + Lwt_io.close oc in *) (* End of logging code *) Lwt.return list_of_seen end