Skip to content
30 changes: 18 additions & 12 deletions sharding/src/node/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct Client {
port: String,
}

/// Implementation of Client
impl Client {
/// Creates a new Client node with the given port
pub fn new(ip: &str, port: &str) -> Self {
Expand All @@ -51,6 +52,7 @@ impl Client {
}
}

/// Tries to get the router info from the other nodes. If found, it connects to the router and returns the TcpStream.
pub fn get_router_info(config: NodesConfig) -> Option<TcpStream> {
let mut candidate_ip;
let mut candidate_port;
Expand Down Expand Up @@ -151,6 +153,7 @@ impl Client {
}
}

/// Handles the router id message and returns the TcpStream if the router is found.
fn handle_router_id_message(response_message: message::Message) -> Option<TcpStream> {
if let Some(node_info) = response_message.get_data().node_info {
let node_ip = node_info.ip.clone();
Expand All @@ -168,7 +171,7 @@ impl Client {
println!(
"{color_bright_green}Connected to router stream {}:{}{style_reset}",
node_ip,
connections_port.to_string()
connections_port
);
return Some(router_stream);
}
Expand All @@ -178,8 +181,9 @@ impl Client {
None
}

/// Handles the received message from a connection.
fn handle_received_message(buffer: &mut [u8]) {
let message_string = String::from_utf8_lossy(&buffer);
let message_string = String::from_utf8_lossy(buffer);
let response_message = match message::Message::from_string(&message_string) {
Ok(message) => message,
Err(_) => {
Expand All @@ -199,18 +203,22 @@ impl Client {
}
}

/// Implementation of Node for Client
impl NodeRole for Client {
fn backend(&self) -> Arc<Mutex<postgres::Client>> {
panic!("Client node does not have a backend");
}

/// Sends query to the router and returns the response.
fn send_query(&mut self, query: &str) -> Option<String> {
if query == "whoami;" {
println!("> I am Client: {}:{}\n", self.ip, self.port);
println!(
"{color_bright_green}> I am Client: {}:{}{style_reset}\n",
self.ip, self.port
);
return None;
}

println!("Sending query from client");
let message =
message::Message::new_query(Some(self.client_info.clone()), query.to_string());

Expand All @@ -219,7 +227,7 @@ impl NodeRole for Client {
query
);

// Intentar obtener el stream actual
// Get current stream
let mut stream = match self.router_postgres_client.stream.lock() {
Ok(stream) => stream,
Err(e) => {
Expand All @@ -228,17 +236,15 @@ impl NodeRole for Client {
}
};

println!("{color_bright_blue}Stream locked{style_reset}");

// Intentar enviar el mensaje y reconectar si falla
// Try to send the message. If it fails, reconnect to a new router
if let Err(e) = stream.write_all(message.to_string().as_bytes()) {
eprintln!(
"{color_bright_red}Failed to send the query: {:?}{style_reset}",
e
);
eprintln!("Reconnecting to new router...");
drop(stream);
// Obtener un nuevo router y actualizar el canal
// Get a new router and update the stream
let new_stream = match Self::get_router_info(get_nodes_config()) {
Some(new_stream) => new_stream,
None => {
Expand All @@ -255,10 +261,10 @@ impl NodeRole for Client {

println!("{color_bright_blue}Query sent to router{style_reset}");

// Preparar el buffer de respuesta
// Prepare the response buffer
let mut buffer: [u8; MAX_PAGE_SIZE] = [0; MAX_PAGE_SIZE];

// Intentar leer la respuesta y reconectar si falla
// Try to read the response and reconnect if it fails
match stream.read(&mut buffer) {
Ok(chars) if chars > 0 => {
Client::handle_received_message(&mut buffer);
Expand All @@ -269,7 +275,7 @@ impl NodeRole for Client {
eprintln!("Reconnecting to new router...");
drop(stream);

// Obtener un nuevo router y actualizar el canal
// Get a new router and update the stream
if let Some(new_stream) = Self::get_router_info(get_nodes_config()) {
self.router_postgres_client = Channel {
stream: Arc::new(Mutex::new(new_stream)),
Expand Down
85 changes: 33 additions & 52 deletions sharding/src/node/memory_manager.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use inline_colorization::*;
use libc::statvfs;
use std::ffi::CString;
use std::io;
use sysinfo::System;
use sysinfo::Disks;

/// This struct represents the Memory Manager in the distributed system.
/// It will manage the memory of the node and will be used to determine if the node should accept new requests.
Expand All @@ -13,22 +10,20 @@ pub struct MemoryManager {
}

impl MemoryManager {
/// Creates a new MemoryManager.
pub fn new(unavailable_memory_perc: f64) -> Self {
let available_memory_perc =
match Self::get_available_memory_percentage(unavailable_memory_perc) {
Some(perc) => perc,
None => panic!("[MemoryManager] Failed to get available memory"),
};
println!(
"{color_blue}[Memory Manager] Memory Threshold: {:?}%{style_reset}",
unavailable_memory_perc
);
MemoryManager {
unavailable_memory_perc,
available_memory_perc,
}
}

/// Updates the available memory percentage.
pub fn update(&mut self) -> Result<(), io::Error> {
self.available_memory_perc =
match Self::get_available_memory_percentage(self.unavailable_memory_perc) {
Expand All @@ -43,55 +38,41 @@ impl MemoryManager {
Ok(())
}

/// Returns the available memory percentage.
fn get_available_memory_percentage(unavailable_memory_perc: f64) -> Option<f64> {
if unavailable_memory_perc == 100.0 {
return Some(0.0);
}

// Create a System object
let mut sys = System::new_all();

// Refresh system data
sys.refresh_all();

// Get the root directory information
let path = match CString::new("/") {
Ok(path) => path,
Err(_) => return None,
};
let mut stat: statvfs = unsafe { std::mem::zeroed() };

if unsafe { statvfs(path.as_ptr(), &mut stat) } == 0 {
let total_space = ((stat.f_blocks as u64) * stat.f_frsize) / 1024;
let available_space = ((stat.f_bavail as u64) * stat.f_frsize) / 1024;

// if percentage is greater than 1, it means that the total of space used exceeds the threshold.
// If so, return 0
let total = total_space as f64;
let threshold_size = total * (unavailable_memory_perc / 100.0);

if threshold_size > available_space as f64 {
println!(
"{color_red}[Memory Manager] Memory Threshold Exceeded Available Space{style_reset}"
);
return Some(0.0);
}

let usable_available_space = available_space as f64 - threshold_size;
let usable_total_space = total - threshold_size / 100.0;

let percentage = usable_available_space / usable_total_space * 100.0;
if percentage > 100.0 {
return Some(0.0);
}
println!(
"{color_blue}[Memory Manager] Available Memory: {:?} %{style_reset}",
percentage
);
Some(percentage)
} else {
None
// Create a Disk object
let disks = Disks::new_with_refreshed_list();

let mut total_space: u64 = 0;
let mut available_space: u64 = 0;

// Get the total and available space of the disk
for disk in &disks {
total_space += disk.total_space();
available_space += disk.available_space();
}

// if percentage is greater than 1, it means that the total of space used exceeds the threshold.
// If so, return 0
let total = total_space as f64;
let threshold_size = total * (unavailable_memory_perc / 100.0);

if threshold_size > available_space as f64 {
return Some(0.0);
}

let usable_available_space = available_space as f64 - threshold_size;
let usable_total_space = total - threshold_size / 100.0;

let percentage = usable_available_space / usable_total_space * 100.0;
if percentage > 100.0 {
return Some(0.0);
}
Some(percentage)
}
}

Expand All @@ -105,7 +86,7 @@ mod tests {
let unavailable_memory_perc = 0.0;
let available_memory_perc =
MemoryManager::get_available_memory_percentage(unavailable_memory_perc);
assert_eq!(available_memory_perc.is_some(), true);
assert!(available_memory_perc.is_some());
}

#[test]
Expand Down
26 changes: 15 additions & 11 deletions sharding/src/node/messages/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl fmt::Debug for Message {
impl Message {
// --- Constructors ---

/// Creates a new Message with the InitConnection type
pub fn new_init_connection(node_info: NodeInfo) -> Self {
Message {
message_type: MessageType::InitConnection,
Expand All @@ -58,6 +59,7 @@ impl Message {
}
}

/// Creates a new Message with the AskMemoryUpdate type
pub fn new_ask_memory_update() -> Self {
Message {
message_type: MessageType::AskMemoryUpdate,
Expand All @@ -68,6 +70,7 @@ impl Message {
}
}

/// Creates a new Message with the MemoryUpdate type
pub fn new_memory_update(payload: f64, max_ids: TablesIdInfo) -> Self {
Message {
message_type: MessageType::MemoryUpdate,
Expand All @@ -78,6 +81,7 @@ impl Message {
}
}

/// Creates a new Message with the Agreed type
pub fn new_agreed(memory_percentage: f64, max_ids: TablesIdInfo) -> Self {
Message {
message_type: MessageType::Agreed,
Expand All @@ -88,6 +92,7 @@ impl Message {
}
}

/// Creates a new Message with the Denied type
pub fn new_denied() -> Self {
Message {
message_type: MessageType::Denied,
Expand All @@ -98,6 +103,7 @@ impl Message {
}
}

/// Creates a new Message with the GetRouter type
pub fn new_get_router() -> Self {
Message {
message_type: MessageType::GetRouter,
Expand All @@ -108,6 +114,7 @@ impl Message {
}
}

/// Creates a new Message with the RouterId type
pub fn new_router_id(node_info: NodeInfo) -> Self {
Message {
message_type: MessageType::RouterId,
Expand All @@ -118,6 +125,7 @@ impl Message {
}
}

/// Creates a new Message with the HelloFromNode type
pub fn new_hello_from_node(node_info: NodeInfo) -> Self {
Message {
message_type: MessageType::HelloFromNode,
Expand All @@ -128,6 +136,7 @@ impl Message {
}
}

/// Creates a new Message with the NoRouterData type
pub fn new_no_router_data() -> Self {
Message {
message_type: MessageType::NoRouterData,
Expand All @@ -138,6 +147,7 @@ impl Message {
}
}

/// Creates a new Message with the Query type
pub fn new_query(sender_info: Option<NodeInfo>, query: String) -> Self {
Message {
message_type: MessageType::Query,
Expand All @@ -148,6 +158,7 @@ impl Message {
}
}

/// Creates a new Message with the QueryResponse type
pub fn new_query_response(query_response: String) -> Self {
Message {
message_type: MessageType::QueryResponse,
Expand All @@ -160,6 +171,7 @@ impl Message {

// --- Method to get the data ---

/// Get the data from the message
pub fn get_data(&self) -> MessageData {
match self.message_type {
MessageType::InitConnection | MessageType::RouterId | MessageType::HelloFromNode => {
Expand All @@ -175,11 +187,6 @@ impl Message {
None => TablesIdInfo::new(),
};
if let Some(payload) = self.payload {
println!(
"Payload: {}, Max Ids: {}",
payload,
max_ids.convert_to_string()
);
MessageData::new_payload(payload, max_ids)
} else {
MessageData::new_none()
Expand Down Expand Up @@ -263,7 +270,7 @@ impl Message {
// Query Data
result.push(' ');
if let Some(query) = &self.query_data {
result.push_str(&query);
result.push_str(query);
} else {
result.push_str("None");
}
Expand Down Expand Up @@ -324,10 +331,7 @@ impl Message {
query.push_str(part);
query.push(' ');
}
match query.split(';').next() {
Some(query) => Some(query.to_string()),
None => None,
}
query.split(';').next().map(|query| query.to_string())
}
None => None,
};
Expand Down Expand Up @@ -594,7 +598,7 @@ mod tests {
"INIT_CONNECTION 0.5 departments:5,employees:3 None None\n",
];

assert!(options.contains(&&message.to_string().as_str()));
assert!(options.contains(&message.to_string().as_str()));
}

#[test]
Expand Down
Loading
Loading