grog is a cluster's protocol implementing a namespaced distributed map.
grog's name originates from the pirates's favorite beverage in the saga
of Monkey Island.
It's well known that such mixture is quite acid, enough to corrode the
mug's metal where it's poured into.
This corrosive property, implies that if you have to transport a bit of grog
from one side of the Island to the other,
you have to equip yourself with a bunch of cups and put them in your stash.
In this way, you can decant the grog when the holding mug is nearly melted.
So, a grog cluster behaves quite similar.
You can rely on the held content - the grog - as far as at least one node
keeps alive in the cluster.
As said, grog aims to implement a distributed map kept alive by an arbitrary number
of nodes over a local network.
Each node holds exactly the same data of any other node.
This means that the map is not partitioned in any way across the peers.
The only purpose of grog is to provide a shared layer, where an actor can
both read and write data without any restriction.
Data kept by grog nodes is not persisted in any way.
This means that, at a certain epoch, a grog's state exists solely when at least
one node is living.
When all nodes disappear, the state held until that moment is lost forever.
- Possibly useful when developing a distributed application.
- Immediate usage.
- Zero network configuration.
- No data definition.
- Arbitrary complex type for values.
- json as unique value's format.
- Small operation set to access and manipulate the map:
get, to get the value of a key from a mapset, to add or update the value of a key in a mapdel, to remove a key, value pair from a map
- Integrable in programs where an implementation for that language exists.
- Not designed to be used in production.
- Not meant to be efficient in space.
grog daemon and CLI is a stand alone program that can be directly executed
allowing to query and modify the map's state.
An implementation of grog daemon and CLI should try to adhere to the following usage guidelines:
SYNOPSIS
grog [-d] [-j <multicast address>] [-p <listening port>] [-l <logging type>] [-v <logging verbosity>] [get <key>] [set <key=value>] [del <key>]
OPTIONS
-d, --daemon Spawn a new daemon
-j, --join Join the cluster at specified multicast group
-p, --port Listen on the specified port
-l, --log Specify logging type [console (default), file name]
-v, --verbosity
Specify logging verbosity [off, trace, info (default), warn, err]
-n, --namespace
Specify a namespace when getting/setting a key
get Get a key's value
set Set a key with its value
del Delete a key
The following scenario describes an example of interaction between grog daemon and a custom application that uses grog.
- From
node-1you spawn a new grog daemon and contextuallyseta key with a value:
node-1$ grog -d set John='{"name":"John", "surname":"Smith", "age":30}'
updated key=John in default namespace- From
node-2yougetthe value for the given key:
node-2$ grog get John
{
"name":"John",
"surname":"Smith",
"age":30
}- Then, you write an application:
/**
Pseudo code of an application that uses an implementation of grog for a programming language
*/
import wxgb.grog
{
//we get a default constructed grog map; in this form it should be able to interact with all other daemons spawned with no specialized arguments.
var grogMap = grog.map()
//let's suppose this language supports optional chaining and nil-coaleshing operator.
let age = node.get("John")?.getAsInt("age") ?? -1
//this should print 30
print("age:\(age)")
//this should set a value in the default namespace
grogMap["Rick"] = "{'name':'Rick', 'surname':'Greene', 'age':57, 'car':'Ford Mustang'}"
}grog's network protocol uses both multicast UDP and TCP.
The protocol relies on timestamps produced by
grog nodes to serialize the actions over the maps.
Due to this, hosts's clocks are strongly required to be synched.
The protocol does not contemplate a master election between nodes.
Does not exist a node representing a source of truth.
The unique qualitative property of a node is represented
by its node id - nid.
The nid's value is set when the node starts and it contains a timestamp
with a nanosecond resolution.
It is established by the protocol that a node with a lower nid prevails
on a timestamp collision,
or when a recovery phase occurs across the cluster.
When a node B starts, it broadcasts an alive message over the
UDP multicast channel.
Supposing there is already another node A running, it will broadcast
back its alive message.
Node B thus, uses the node A alive message in order to ask
for a map's snapshot.
A snapshot is requested by a node to another node via TCP.
After the snapshot has been successfully transmitted, socket closes.
Every map's state variation will be disseminated via incremental
packets on UDP.
So, there are two main flows:
- snapshot, solely via TCP
- incremental, solely via UDP multicast
When a node starts, it performs the following steps:
- Send an alive
Amessage over the UDP multicast channel
{
type: "A", //message type, A (alive)
ts: 0, //the map's timestamp at the sending epoch of the message, zero when this is the first A message produced
nid: ${node-starting-TS}, //the node-id, it is set at the starting epoch of the node
seqno: 0, //the last sequence number produced by this node, it is incremented every time an I message is produced
address: "${ip}:${port}" //the node's ip:port where it listen for snapshot requests
}- If an
Amessage is received from an another node, then connect to obtain a snapshotSmessage. - If no
Amessages are received in useful time, then set a map's timestamp and go on with the live phase.
A node serves a snapshot when a TCP connection is established at the address specified in the A message.
The json entity containing a snapshot is streamed over the TCP channel.
The first 4 bytes received on the stream denote the length (in bytes) of the subsequent json entity.
${message-length} //4 bytes (big endian), containing the subsequent json message length
{
type: "S", //message type, S (snapshot)
ts: ${map-TS}, //the map's timestamp at the sending epoch of the message
nid: ${node-starting-TS}, //the node-id, it is set at the starting epoch of the node
seqnos: [{nid: ${nid-0},
seqno :${seqno-0}},
{nid: ${nid-1},
seqno :${seqno-1}},
{nid: ${nid-2},
seqno :${seqno-2}},
...
], //an array containing all the sequence numbers of every node contributed updating the map
snapshot: {...} //the map's snapshot data, see below for format's details
}The snapshot field in the S message has the following format:
{
ts: ${map-TS}, //the map's timestamp, it's the last event timestamp
snapshot-ns: [ //an array of namespaced items
{
ns: ${namespace-0}, //the item namespace
seqnos: [ //an array of pairs K:V, each one falling into the same parent's namespace
{
ts: ${event-TS-0}, //each K:V pair has associated an event's TS
nid: ${nid-0}, //each K:V pair has associated the node id that produced the event
key: ${K-0},
val: ${V-0}
},
{
ts: ${event-TS-1},
nid: ${nid-1},
key: ${K-1},
val: ${V-1}
},
...
]
},
...
]
}- A node receiving a snapshot is required to:
- Collect all the receiving, incremental
Imessages sent by other nodes during this phase. - After have received the snapshot, process all the collected
Imessages with the Incremental message rules.
- Collect all the receiving, incremental
If a node receiving a snapshot experiences a disconnection from the serving
node before
the entire stream has been transferred, then it is required to retry the
entire procedure.
If no node is available to serve a snapshot, then set a map's timestamp and
go on with the live phase.
After having successfully completed the initial phase, a node can transit into
the live phase.
This is the regular final state of a running node.
When in live phase, a node must:
- process all the incoming
Imessages. - respond to all the incoming TCP connections and serve the snapshots.
- send unsolicited
Amessages with a frequency of 20 seconds. - send an
Amessage in reply to receivedAmessages withts = 0(new nodes).
When the node is part of an application, it processes the changes requested
by the applicative layer.
In this case, the node also actively produces I messages.
An I message has the following structure:
{
type: "I", //message type, I (incremental)
ts: ${I-TS}, //the I timestamp, it denotes the timestamp of this event
nid: ${node-starting-TS}, //the node-id, it is set at the starting epoch of the node
seqno: ${last-seqno}, //the last sequence number produced by this node
op: ${operation-code}, //the operation associated with the message: set or del
ns: ${namespace}, //item's namespace
key: ${K}, //item's key
val: ${V} //item's value
}When applying incremental messages a node is required to adhere the following rules:
- Check if the received
Imessage is in-sequence in respect of its ownnidflow.- Discard all received messages with a
seqnolesser than the expected. - Start the recovery phase when receiving a message with a
seqnogreater than the expected.
- Discard all received messages with a
- When the received message is in-sequence, lookup in the map for an entry with the given key:
K:- If an entry doesn't exists, then apply the incoming operation.
- If an entry exists, evaluate the entry's
ts:- If the message's
tsis lesser than entry'sts, then discard the incoming operation. - If the message's
tsis greater than entry'sts, then apply the incoming operation. - If the message's
tsis equal to the entry'sts(timestamp collision), then evaluate the entry'snid:- Apply the incoming operation only if the message's
nidis lesser than the entry'snid.
- Apply the incoming operation only if the message's
- If the message's
This is a special state that occurs when certain erroneous
conditions are detected on a node.
A recovery phase can resolve inside a node or, at worst,
can potentially affect the state of the entire cluster.
Recall that every node can independently produce data and
there is no source of truth.
With these premises, say if it is more correct to save a state
rather than another, is a total arbitrary matter.
Because a choice must be done, when a recovery state initiates,
it was chosen to make prevail the state of an older node in the cluster.
Erroneous conditions can occur due to UDP unreliability.
Indeed, a node can:
- Receive packets out of sequence.
- Lose packets.
A node detects a gap when it receives an OOS packet; this is possible with:
Awith aseqnohigher than the expected.Iwith aseqnohigher than the expected.
When a gap is detected with an I message, a node is allowed to wait
a grace period of 100 ms trying to receive the missing packet(s).
If the gap resolves spontaneously, thus the missing packed are received and reordered,
no further actions are required.
If instead, a gap is detected with an A message or the grace period
elapses, the node must initiate a recovery phase.
As said, the protocol chooses to prevail the state of an older node.
So, a node starting a recovery phase, evaluates its nid against the
nid of the received out-of-sequence packet.
When:
- The node's
nidis greater than the counterpart'snid:- Trigger a snapshot request, to be done after 2 seconds, against the counterpart.
- If, during the countdown, an OOS
Amessage with an even lessernidis received, then reset the countdown and push-front a new snapshot request with the new counterpart. - Requests must be try in order, from the smallest
nidto the greatest; the process stops as soon as a request completes successfully.
- The node's
nidis lesser than the counterpart'snid- Broadcast an artificial OOS
Amessage and update the internalseqnoaccordingly.
- Broadcast an artificial OOS
There are 4 nodes forming a cluster: N1, N2, N3 and N4.
N1 is the oldest, N2 has spawned after N1 and so on.
At a certain epoch, N3 produces an I message that is not received by N4.
When N3 produces an A message, N4 detects a gap because the seqno in the message is
not the expected one.
[1] [2]
N1 <--- N1 <--- [ok]
| |
N2 <--- N2 <--- [ok]
| |
N3 ---> (I) N3 ---> (A)
| |
N4 X-- N4 <--- [gap detected]
Because N4's nid is greater than N3's nid,
N4 triggers a request for a snapshot to N3 in 2 seconds.
[3] [4]
N1 N1
N2 N2
N3 <--< N3 >--> (S)
| [TCP] |
N4 >--> N4 <--<
N4 is now recovered.
At a certain epoch, N3 produces an I message that is not received by N2.
When N3 produces an A message, N2 detects a gap because the seqno in the message is
not the expected one.
[1] [2]
N1 <--- N1 <--- [ok]
| |
N2 X-- N2 <--- [gap detected]
| |
N3 ---> (I) N3 ---> (A)
| |
N4 <--- N4 <--- [ok]
Because N2's nid is lesser than N3's nid, N2 broadcast an artificial OOS A message.
[3]
N1 <--- [gap detected]
|
N2 ---> (OOS A)
|
N3 <--- [gap detected]
|
N4 <--- [gap detected]
When N2 produces an OOS A message, all the other nodes detect a gap.
Because N1's nid is lesser than N2's nid, N1 broadcast an artificial OOS A message.
Because N3's nid is greater than N2's nid,
N3 triggers a request for a snapshot to N2 in 2 seconds.
Because N4's nid is greater than N2's nid,
N4 triggers a request for a snapshot to N2 in 2 seconds.
[4]
N1 ---> (OOS A)
|
N2 <--- [gap detected]
|
N3 <--- [gap detected] [pending snapshot request to N2]
|
N4 <--- [gap detected] [pending snapshot request to N2]
When N1 produces an OOS A message, all the other nodes detect a gap.
Because N2's nid is greater than N1's nid,
N2 triggers a request for a snapshot to N1 in 2 seconds.
Because N3's nid is greater than N1's nid,
N3 triggers a request for a snapshot to N1 in 2 seconds.
Because N4's nid is greater than N1's nid,
N4 triggers a request for a snapshot to N1 in 2 seconds.
[5] [6]
N1 <------- N1 >-------
| | | | | |
N2 >--> | | N2 <--< | |
| | [TCP] | | (S)x3
N3 >----> | N3 <----< |
| |
N4 >------> N4 <------<
N2 is now recovered.
