diff --git a/lib/dune b/lib/dune index f5c1898..71009f1 100644 --- a/lib/dune +++ b/lib/dune @@ -1,4 +1,4 @@ (library (name pollinate) (public_name pollinate) - (libraries pollinate.common pollinate.node)) + (libraries pollinate.common pollinate.pnode)) diff --git a/lib/node/client.ml b/lib/pnode/client.ml similarity index 83% rename from lib/node/client.ml rename to lib/pnode/client.ml index 71fa904..787ad48 100644 --- a/lib/node/client.ml +++ b/lib/pnode/client.ml @@ -27,8 +27,7 @@ let create_request node ?(request_ack = false) recipient payload = Lwt.return Message. { - category = Message.Request; - sub_category_opt = None; + pollinate_category = Message.Request; request_ack; id = !id; timestamp = Unix.gettimeofday (); @@ -40,8 +39,7 @@ let create_request node ?(request_ack = false) recipient payload = let create_response node ?(request_ack = false) request payload = Message. { - category = Message.Response; - sub_category_opt = None; + pollinate_category = Message.Response; request_ack; id = request.id; timestamp = Unix.gettimeofday (); @@ -53,10 +51,9 @@ let create_response node ?(request_ack = false) request payload = let create_post node ?(request_ack = false) payload = Message. { - category = Message.Post; + pollinate_category = Message.Post; request_ack; id = -1; - sub_category_opt = None; timestamp = Unix.gettimeofday (); sender = !node.address; recipients = []; @@ -66,8 +63,7 @@ let create_post node ?(request_ack = false) payload = let create_ack node incoming_message = Message. { - category = Message.Acknowledgment; - sub_category_opt = None; + pollinate_category = Message.Acknowledgment; request_ack = false; id = -1; timestamp = Unix.gettimeofday (); @@ -76,8 +72,7 @@ let create_ack node incoming_message = payload = incoming_message |> Message.hash_of |> Bytes.of_string; } -let request node request recipient = - let%lwt message = create_request node recipient request in +let request node message = let%lwt () = Networking.send_to node message in let condition_var = Lwt_condition.create () in Hashtbl.add !node.request_table message.id condition_var; @@ -85,3 +80,6 @@ let request node request recipient = let post node message = !node.disseminator <- Disseminator.post !node.disseminator message + +let broadcast node message recipients = + Networking.broadcast node message recipients diff --git a/lib/node/client.mli b/lib/pnode/client.mli similarity index 93% rename from lib/node/client.mli rename to lib/pnode/client.mli index eb4c2ce..11b726a 100644 --- a/lib/node/client.mli +++ b/lib/pnode/client.mli @@ -38,7 +38,7 @@ val create_response : returns a promise holding the response from the peer. This function blocks the current thread of execution until a response arrives. *) -val request : node ref -> bytes -> Address.t -> Message.t Lwt.t +val request : node ref -> Message.t -> Message.t Lwt.t (** [create_post node payload] creates a [Message.t] of the {i Post category} containing {i payload} for eventual gossip dissemination across the @@ -46,3 +46,5 @@ val request : node ref -> bytes -> Address.t -> Message.t Lwt.t val create_post : node ref -> ?request_ack:bool -> bytes -> Message.t val create_ack : node ref -> Message.t -> Message.t + +val broadcast : node ref -> Message.t -> Address.t list -> unit Lwt.t diff --git a/lib/node/disseminator.ml b/lib/pnode/disseminator.ml similarity index 100% rename from lib/node/disseminator.ml rename to lib/pnode/disseminator.ml diff --git a/lib/node/disseminator.mli b/lib/pnode/disseminator.mli similarity index 100% rename from lib/node/disseminator.mli rename to lib/pnode/disseminator.mli diff --git a/lib/node/dune b/lib/pnode/dune similarity index 73% rename from lib/node/dune rename to lib/pnode/dune index b911382..ed5e578 100644 --- a/lib/node/dune +++ b/lib/pnode/dune @@ -1,6 +1,6 @@ (library - (name node) - (public_name pollinate.node) + (name pnode) + (public_name pollinate.pnode) (libraries bin_prot lwt lwt.unix pollinate.common) (preprocess (pps ppx_bin_prot ppx_deriving.show lwt_ppx))) diff --git a/lib/node/failure_detector.ml b/lib/pnode/failure_detector.ml similarity index 98% rename from lib/node/failure_detector.ml rename to lib/pnode/failure_detector.ml index 43f0cae..cb9fd6f 100644 --- a/lib/node/failure_detector.ml +++ b/lib/pnode/failure_detector.ml @@ -57,9 +57,8 @@ let update_peer_status node peer status = let create_message node message recipient = Message. { - category = Failure_detection; + pollinate_category = Failure_detection; request_ack = false; - sub_category_opt = None; id = -1; timestamp = Unix.gettimeofday (); sender = Client.address_of !node; diff --git a/lib/node/failure_detector.mli b/lib/pnode/failure_detector.mli similarity index 100% rename from lib/node/failure_detector.mli rename to lib/pnode/failure_detector.mli diff --git a/lib/node/message.ml b/lib/pnode/message.ml similarity index 87% rename from lib/node/message.ml rename to lib/pnode/message.ml index b451906..1f5c722 100644 --- a/lib/node/message.ml +++ b/lib/pnode/message.ml @@ -1,7 +1,7 @@ open Common open Bin_prot.Std -type category = +type pollinate_category = | Uncategorized | Acknowledgment | Request @@ -12,8 +12,7 @@ type category = [@@deriving bin_io, show] type t = { - category : category; - sub_category_opt : (string * string) option; + pollinate_category : pollinate_category; request_ack : bool; id : int; timestamp : float; diff --git a/lib/node/message.mli b/lib/pnode/message.mli similarity index 89% rename from lib/node/message.mli rename to lib/pnode/message.mli index 6f3be77..15efc58 100644 --- a/lib/node/message.mli +++ b/lib/pnode/message.mli @@ -10,7 +10,7 @@ open Common (** Messages are {i requests} or {i responses}, determining how they are stored and where they are handled. *) -type category = +type pollinate_category = | Uncategorized | Acknowledgment | Request @@ -23,8 +23,7 @@ type category = (** Messages received from [peers] which are processed by the node's message handler. *) type t = { - category : category; - sub_category_opt : (string * string) option; + pollinate_category : pollinate_category; request_ack : bool; id : int; timestamp : float; diff --git a/lib/node/networking.ml b/lib/pnode/networking.ml similarity index 100% rename from lib/node/networking.ml rename to lib/pnode/networking.ml diff --git a/lib/node/networking.mli b/lib/pnode/networking.mli similarity index 87% rename from lib/node/networking.mli rename to lib/pnode/networking.mli index e818c96..58bc477 100644 --- a/lib/node/networking.mli +++ b/lib/pnode/networking.mli @@ -6,6 +6,9 @@ to a specified peer within the [Message.t]. Construct a message with one of the [create_*] functions to then feed to this function. *) val send_to : node ref -> Message.t -> unit Lwt.t +(** Same as `send_to` but to several recipients *) +val broadcast : node ref -> Message.t -> Address.t list -> unit Lwt.t + (** Waits for the next incoming message and returns it. *) val recv_next : node ref -> Message.t Lwt.t diff --git a/lib/node/node.ml b/lib/pnode/pnode.ml similarity index 100% rename from lib/node/node.ml rename to lib/pnode/pnode.ml diff --git a/lib/node/node.mli b/lib/pnode/pnode.mli similarity index 100% rename from lib/node/node.mli rename to lib/pnode/pnode.mli diff --git a/lib/node/server.ml b/lib/pnode/server.ml similarity index 91% rename from lib/node/server.ml rename to lib/pnode/server.ml index a5243bc..d23efa0 100644 --- a/lib/node/server.ml +++ b/lib/pnode/server.ml @@ -33,7 +33,8 @@ let handle_ack node msg = Otherwise, we just apply the message handler and that's it. *) -let process_message node preprocessor msg_handler = +let process_message node preprocessor (msg_handler : Message.t -> bytes option) + = let open Message in let%lwt message = Networking.recv_next node in let message = preprocessor message in @@ -43,7 +44,7 @@ let process_message node preprocessor msg_handler = (Printf.sprintf "Processing message %s from %d...\n" (Message.hash_of message) message.sender.port) in *) let%lwt () = - match message.category with + match message.pollinate_category with | Response -> Lwt.return (handle_response node message) | Request -> ( (* let%lwt () = @@ -52,11 +53,11 @@ let process_message node preprocessor msg_handler = !node.address.address !node.address.port message.sender.address message.sender.port) in *) match msg_handler message with - | Some response -> - response - |> Client.create_response node message - |> Networking.send_to node - | None -> Lwt.return ()) + | Some payload -> + (* let _ = Printf.sprintf "I am inside msg_handler, found payload\n%!" in *) + Client.create_response node message payload |> Networking.send_to node + | None -> + failwith "received request without payload nor payload_signature") | Acknowledgment -> let msg_hash = Bytes.to_string message.payload in let new_addrs = @@ -128,7 +129,7 @@ let _print_logs node = 3. Run the disseminator, this includes actually sending messages to be disseminated across the network. 4. Wait 0.001 seconds before restarting the procedure. *) -let rec run node preprocessor msg_handler = +let rec run node preprocessor (msg_handler : Message.t -> bytes option) = (* Step 0 *) (* let%lwt () = print_logs node in *) (* Step 1 *) diff --git a/lib/node/server.mli b/lib/pnode/server.mli similarity index 100% rename from lib/node/server.mli rename to lib/pnode/server.mli diff --git a/lib/node/types.ml b/lib/pnode/types.ml similarity index 100% rename from lib/node/types.ml rename to lib/pnode/types.ml diff --git a/lib/node/types.mli b/lib/pnode/types.mli similarity index 100% rename from lib/node/types.mli rename to lib/pnode/types.mli diff --git a/lib/node/util.ml b/lib/pnode/util.ml similarity index 100% rename from lib/node/util.ml rename to lib/pnode/util.ml diff --git a/lib/pollinate.ml b/lib/pollinate.ml index 84d1900..ab4d85a 100644 --- a/lib/pollinate.ml +++ b/lib/pollinate.ml @@ -1,4 +1,4 @@ module Address = Common.Address module Util = Common.Util module Peer = Common.Peer -module Node = Node +module PNode = Pnode diff --git a/test/commons.ml b/test/commons.ml index 3b083e5..3b55d88 100644 --- a/test/commons.ml +++ b/test/commons.ml @@ -1,12 +1,13 @@ (** Utils function shared by the different tests modules *) module Commons = struct - open Pollinate.Node + open Pollinate.PNode open Pollinate.Util open Messages + open Message let preprocessor msg = let open Messages in - match msg.Message.category with + match msg.Message.pollinate_category with | Request -> let[@warning "-8"] (Request r) = Encoding.unpack bin_read_message msg.Message.payload in @@ -19,8 +20,7 @@ module Commons = struct let msg_handler message = let open Messages in - let open Message in - match message.category with + match message.pollinate_category with | Request -> let request = Encoding.unpack bin_read_request message.payload in let response = @@ -28,6 +28,8 @@ module Commons = struct | Ping -> Pong | Get -> Pong | Insert _ -> Success "Successfully added value to state" in - Response response |> Encoding.pack bin_writer_message |> Option.some - | _ -> None + let msg = Response response |> Encoding.pack bin_writer_message in + Some msg + | Post -> None + | _ -> failwith "unhandled in tests" end diff --git a/test/disseminator_tests.ml b/test/disseminator_tests.ml index 8b23ee2..390c865 100644 --- a/test/disseminator_tests.ml +++ b/test/disseminator_tests.ml @@ -1,53 +1,53 @@ open Lwt.Infix open Commons open Pollinate -open Pollinate.Node +open Pollinate.PNode module Disseminator_tests = struct let node = Lwt_main.run (let%lwt node_a = - Node.init ~init_peers:[] Address.{ address = "127.0.0.1"; port = 5000 } - in + Pnode.init ~init_peers:[] + Address.{ address = "127.0.0.1"; port = 5000 } in Lwt.return node_a) let queue_insertion_test () = - let _server = Node.run_server ~msg_handler:Commons.msg_handler node in - Client.address_of !node - |> (fun Address.{ port; _ } -> port) - |> string_of_int - |> String.to_bytes - |> Client.create_post node - |> Client.post node; + let _server = Pnode.run_server ~msg_handler:Commons.msg_handler node in + let payload = + Client.address_of !node + |> (fun Address.{ port; _ } -> port) + |> string_of_int + |> String.to_bytes in + Client.create_post node payload |> Client.post node; - Lwt.return (List.length (Node.Testing.broadcast_queue node)) + Lwt.return (List.length (Pnode.Testing.broadcast_queue node)) let queue_removal_test () = - let _server = Node.run_server ~msg_handler:Commons.msg_handler node in - Client.address_of !node - |> (fun Address.{ port; _ } -> port) - |> string_of_int - |> String.to_bytes - |> Client.create_post node - |> Client.post node; + let _server = Pnode.run_server ~msg_handler:Commons.msg_handler node in + let payload = + Client.address_of !node + |> (fun Address.{ port; _ } -> port) + |> string_of_int + |> String.to_bytes in + Client.create_post node payload |> Client.post node; let%lwt () = - while%lwt Node.Testing.disseminator_round node <= 10 do + while%lwt Pnode.Testing.disseminator_round node <= 10 do Lwt_unix.sleep 0.1 done in - Lwt.return (List.length (Node.Testing.broadcast_queue node)) + Lwt.return (List.length (Pnode.Testing.broadcast_queue node)) let seen_message_test () = - let _server = Node.run_server ~msg_handler:Commons.msg_handler node in - let message = + let _server = Pnode.run_server ~msg_handler:Commons.msg_handler node in + let payload = Client.address_of !node |> (fun Address.{ port; _ } -> port) |> string_of_int - |> String.to_bytes - |> Client.create_post node in + |> String.to_bytes in + let message = Client.create_post node payload in message |> Client.post node; - Lwt.return (Node.seen node message) + Lwt.return (Pnode.seen node message) end (** Test for dissemination given a specific node. *) diff --git a/test/failure_detector_prop.ml b/test/failure_detector_prop.ml index 38f3c42..eddfde0 100644 --- a/test/failure_detector_prop.ml +++ b/test/failure_detector_prop.ml @@ -1,17 +1,17 @@ open QCheck2.Gen open Pollinate.Peer open Pollinate -module SUT = Pollinate.Node.Testing.Failure_detector +module SUT = Pollinate.PNode.Testing.Failure_detector let node_a = - Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3002 }) + Lwt_main.run (PNode.init Address.{ address = "127.0.0.1"; port = 3002 }) let update_peer = QCheck2.Test.make ~count:1000 ~name:"update_neighbor_status successfully update neighbor status" (pair Generators.peer_gen Generators.peer_status_gen) (fun (neighbor, neighbor_status) -> - let _ = add_neighbor (Pollinate.Node.Client.peer_from !node_a) neighbor in + let _ = add_neighbor (PNode.Client.peer_from !node_a) neighbor in let _ = SUT.update_peer_status node_a neighbor neighbor_status in neighbor.status = neighbor_status) diff --git a/test/failure_detector_tests.ml b/test/failure_detector_tests.ml index 9a46f83..45c7597 100644 --- a/test/failure_detector_tests.ml +++ b/test/failure_detector_tests.ml @@ -1,13 +1,13 @@ open Pollinate -open Pollinate.Node +open Pollinate.PNode open Lwt.Infix -module SUT = Pollinate.Node.Testing.Failure_detector +module SUT = Pollinate.PNode.Testing.Failure_detector let node_a = - Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3003 }) + Lwt_main.run (Pnode.init Address.{ address = "127.0.0.1"; port = 3003 }) let node_b = - Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3004 }) + Lwt_main.run (Pnode.init Address.{ address = "127.0.0.1"; port = 3004 }) let peer_b = Client.peer_from !node_b @@ -15,7 +15,7 @@ let failure_detection () = let open Common.Peer in let open Client in let _ = add_peer_as_is !node_a peer_b in - let _ = Pollinate.Node.Client.peer_from !node_a in + let _ = PNode.Client.peer_from !node_a in let _ = SUT.update_peer_status node_a peer_b Suspicious in (* Need to wait for the timeout to be reached An other way to do, would be to change the `last_suspicious_status` of the peer *) let%lwt _ = Lwt_unix.sleep 9.1 in @@ -24,7 +24,7 @@ let failure_detection () = let failure_detection_nothing_on_alive () = let open Common.Peer in - let _ = add_neighbor (Pollinate.Node.Client.peer_from !node_a) peer_b in + let _ = add_neighbor (PNode.Client.peer_from !node_a) peer_b in let _ = SUT.update_peer_status node_a peer_b Alive in let%lwt _ = SUT.failure_detection node_a in Lwt.return (Base.Hashtbl.length !node_a.peers = 1) diff --git a/test/gossip_tests.ml b/test/gossip_tests.ml index c1fcb18..1af3ebb 100644 --- a/test/gossip_tests.ml +++ b/test/gossip_tests.ml @@ -1,7 +1,7 @@ open Lwt.Infix open Commons open Pollinate -open Pollinate.Node +open Pollinate.PNode module Gossip_tests = struct let local_address port = Address.{ address = "127.0.0.1"; port } @@ -41,16 +41,19 @@ module Gossip_tests = struct local_address 4009 ) in let%lwt node_a = - Node.init ~init_peers:[addr_b; addr_c; addr_e; addr_h] addr_a in - let%lwt node_b = Node.init ~init_peers:[addr_a; addr_d; addr_e] addr_b in - let%lwt node_c = Node.init ~init_peers:[addr_a; addr_f; addr_g] addr_c in - let%lwt node_d = Node.init ~init_peers:[addr_b] addr_d in - let%lwt node_e = Node.init ~init_peers:[addr_a; addr_b] addr_e in - let%lwt node_f = Node.init ~init_peers:[addr_c] addr_f in - let%lwt node_g = Node.init ~init_peers:[addr_c] addr_g in - let%lwt node_h = Node.init ~init_peers:[addr_a; addr_i; addr_j] addr_h in - let%lwt node_i = Node.init ~init_peers:[addr_h] addr_i in - let%lwt node_j = Node.init ~init_peers:[addr_h] addr_j in + Pnode.init ~init_peers:[addr_b; addr_c; addr_e; addr_h] addr_a in + let%lwt node_b = + Pnode.init ~init_peers:[addr_a; addr_d; addr_e] addr_b in + let%lwt node_c = + Pnode.init ~init_peers:[addr_a; addr_f; addr_g] addr_c in + let%lwt node_d = Pnode.init ~init_peers:[addr_b] addr_d in + let%lwt node_e = Pnode.init ~init_peers:[addr_a; addr_b] addr_e in + let%lwt node_f = Pnode.init ~init_peers:[addr_c] addr_f in + let%lwt node_g = Pnode.init ~init_peers:[addr_c] addr_g in + let%lwt node_h = + Pnode.init ~init_peers:[addr_a; addr_i; addr_j] addr_h in + let%lwt node_i = Pnode.init ~init_peers:[addr_h] addr_i in + let%lwt node_j = Pnode.init ~init_peers:[addr_h] addr_j in Lwt.return ( node_a, node_b, @@ -96,18 +99,17 @@ module Gossip_tests = struct (* Start the server for each node in a thread *) let _ = List.map - (Node.run_server ~preprocessor:Commons.preprocessor + (Pnode.run_server ~preprocessor:Commons.preprocessor ~msg_handler:Commons.msg_handler) nodes in (* Create the message to be posted by node *) - let message = + let payload = Client.address_of !node |> (fun Address.{ port; _ } -> port) |> string_of_int - |> String.to_bytes - |> Client.create_post ~request_ack:true node in - + |> String.to_bytes in + let message = Client.create_post node ~request_ack:true payload in (* Post the created message *) Client.post node message; @@ -122,19 +124,19 @@ module Gossip_tests = struct (* Compute the list of nodes who have seen the post. *) let list_of_seen = Hashtbl.find !node.acknowledgments (Message.hash_of message) - |> Testing.AddressSet.to_seq + |> Pnode.Testing.AddressSet.to_seq |> Seq.map (fun address -> address.Address.port) |> List.of_seq in (* Write the length of list_of_seen to a tmp log file *) - let%lwt () = - let%lwt oc = - Lwt_io.open_file - ~flags:[Unix.O_WRONLY; Unix.O_APPEND; Unix.O_CREAT] - ~mode:Lwt_io.Output "/tmp/log.txt" in - let%lwt () = - Lwt_io.write oc (Printf.sprintf "%d\n" (List.length list_of_seen)) in - Lwt_io.close oc in + (* let%lwt () = + let%lwt oc = + Lwt_io.open_file + ~flags:[Unix.O_WRONLY; Unix.O_APPEND; Unix.O_CREAT] + ~mode:Lwt_io.Output "/tmp/log.txt" in + let%lwt () = + Lwt_io.write oc (Printf.sprintf "%d\n" (List.length list_of_seen)) in + Lwt_io.close oc in *) (* End of logging code *) Lwt.return list_of_seen end diff --git a/test/networking_prop.ml b/test/networking_prop.ml index 2bc018b..48c4cd1 100644 --- a/test/networking_prop.ml +++ b/test/networking_prop.ml @@ -1,10 +1,10 @@ open QCheck2.Gen open Pollinate.Peer open Pollinate -module SUT = Pollinate.Node.Testing.Networking +module SUT = Pollinate.PNode.Testing.Networking let node_a = - Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 2002 }) + Lwt_main.run (PNode.init Address.{ address = "127.0.0.1"; port = 2002 }) let knuth_shuffle_size = QCheck2.Test.make ~count:1000 diff --git a/test/node_tests.ml b/test/node_tests.ml index 84a17c7..07ca0ec 100644 --- a/test/node_tests.ml +++ b/test/node_tests.ml @@ -1,19 +1,19 @@ open Lwt.Infix open Commons open Pollinate -open Pollinate.Node +open Pollinate.PNode open Pollinate.Util open Messages module Node_tests = struct (* Initializes two nodes and the related two peers *) let node_a = - Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3000 }) + Lwt_main.run (Pnode.init Address.{ address = "127.0.0.1"; port = 3000 }) let peer_a = Client.peer_from !node_a let node_b = - Lwt_main.run (Node.init Address.{ address = "127.0.0.1"; port = 3001 }) + Lwt_main.run (Pnode.init Address.{ address = "127.0.0.1"; port = 3001 }) let peer_b = Client.peer_from !node_b @@ -23,17 +23,17 @@ module Node_tests = struct let open Messages in let _ = Lwt_list.map_p - (Node.run_server ~preprocessor:Commons.preprocessor + (Pnode.run_server ~preprocessor:Commons.preprocessor ~msg_handler:Commons.msg_handler) [node_a; node_b] in let get = Encoding.pack bin_writer_message (Request Get) in - let%lwt { payload = res_from_b; _ } = - Client.request node_a get peer_b.address in + let%lwt message = Client.create_request node_a peer_b.address get in + let%lwt { payload = res_from_b; _ } = Client.request node_a message in let res_from_b = Encoding.unpack bin_read_response res_from_b in - let%lwt { payload = res_from_a; _ } = - Client.request node_b get peer_a.address in + let%lwt message = Client.create_request node_b peer_a.address get in + let%lwt { payload = res_from_a; _ } = Client.request node_b message in let res_from_a = Encoding.unpack bin_read_response res_from_a in let res_from_b, res_from_a = @@ -47,12 +47,13 @@ module Node_tests = struct let open Messages in let _ = Lwt_list.map_p - (Node.run_server ~preprocessor:Commons.preprocessor + (Pnode.run_server ~preprocessor:Commons.preprocessor ~msg_handler:Commons.msg_handler) [node_a; node_b] in let ping = Encoding.pack bin_writer_message (Request Ping) in - let%lwt { payload = pong; _ } = Client.request node_a ping peer_b.address in + let%lwt message = Client.create_request node_a peer_b.address ping in + let%lwt { payload = pong; _ } = Client.request node_a message in let pong = Encoding.unpack bin_read_response pong in let pong =