From 6499bc512df4ac1b283c190a153906bee89b5b09 Mon Sep 17 00:00:00 2001 From: NContinanza Date: Mon, 18 Nov 2024 15:43:14 -0300 Subject: [PATCH 01/13] Remove debug logs --- sharding/src/node/client.rs | 6 --- sharding/src/node/memory_manager.rs | 12 ------ sharding/src/node/messages/message.rs | 5 --- sharding/src/node/node.rs | 42 ++++-------------- sharding/src/node/router.rs | 62 +++------------------------ sharding/src/node/shard.rs | 39 +---------------- sharding/src/node/shard_manager.rs | 15 ------- sharding/src/psql/psql.rs | 3 -- sharding/tests/integration_tests.rs | 2 - 9 files changed, 15 insertions(+), 171 deletions(-) diff --git a/sharding/src/node/client.rs b/sharding/src/node/client.rs index cee48be0468b6..e419fa571ae94 100644 --- a/sharding/src/node/client.rs +++ b/sharding/src/node/client.rs @@ -174,7 +174,6 @@ impl NodeRole for Client { return None; } - println!("Sending query from client"); let message = message::Message::new_query(Some(self.client_info.clone()), query.to_string()); @@ -192,8 +191,6 @@ impl NodeRole for Client { } }; - println!("{color_bright_blue}Stream locked{style_reset}"); - // Intentar enviar el mensaje y reconectar si falla if let Err(e) = stream.write_all(message.to_string().as_bytes()) { eprintln!( @@ -219,13 +216,10 @@ impl NodeRole for Client { eprintln!("Failed to send query after reconnecting: {:?}", e); return None; } - println!("{color_bright_blue}Query sent to new router{style_reset}"); } else { eprintln!("No valid router found during reconnection."); return None; } - } else { - println!("{color_bright_blue}Query sent to router{style_reset}"); } // Preparar el buffer de respuesta diff --git a/sharding/src/node/memory_manager.rs b/sharding/src/node/memory_manager.rs index 13e8058faca49..91221f27de23b 100644 --- a/sharding/src/node/memory_manager.rs +++ b/sharding/src/node/memory_manager.rs @@ -1,4 +1,3 @@ -use inline_colorization::*; use libc::statvfs; use std::ffi::CString; use std::io; @@ -19,10 +18,6 @@ impl MemoryManager { Some(perc) => perc, None => panic!("[MemoryManager] Failed to get available memory"), }; - println!( - "{color_blue}[Memory Manager] Memory Threshold: {:?}%{style_reset}", - unavailable_memory_perc - ); MemoryManager { unavailable_memory_perc, available_memory_perc, @@ -68,9 +63,6 @@ impl MemoryManager { let threshold_size = total * (unavailable_memory_perc / 100.0); if threshold_size > available_space as f64 { - println!( - "{color_red}[Memory Manager] Memory Threshold Exceeded Available Space{style_reset}" - ); return Some(0.0); } @@ -81,10 +73,6 @@ impl MemoryManager { if percentage > 100.0 { return Some(0.0); } - println!( - "{color_blue}[Memory Manager] Available Memory: {:?} %{style_reset}", - percentage - ); Some(percentage) } else { None diff --git a/sharding/src/node/messages/message.rs b/sharding/src/node/messages/message.rs index 7372538481fa7..156dd667fe8b3 100644 --- a/sharding/src/node/messages/message.rs +++ b/sharding/src/node/messages/message.rs @@ -175,11 +175,6 @@ impl Message { None => TablesIdInfo::new(), }; if let Some(payload) = self.payload { - println!( - "Payload: {}, Max Ids: {}", - payload, - max_ids.convert_to_string() - ); MessageData::new_payload(payload, max_ids) } else { MessageData::new_none() diff --git a/sharding/src/node/node.rs b/sharding/src/node/node.rs index 06b8711db1702..642c7e223d8a2 100644 --- a/sharding/src/node/node.rs +++ b/sharding/src/node/node.rs @@ -4,6 +4,7 @@ use crate::node::client::Client; use crate::utils::node_config::get_nodes_config_raft; use crate::utils::node_config::INIT_HISTORY_FILE_PATH; use crate::utils::queries::print_rows; +use inline_colorization::*; use postgres::Row; use std::ffi::CStr; use std::fmt::Error; @@ -136,10 +137,8 @@ pub extern "C" fn InitNodeInstance(node_type: NodeType, port: *const i8) { panic!("Received an invalid UTF-8 string"); } }; - println!("found_port: {}", found_port); } let ip = "127.0.0.1"; - println!("before init_shard ip: {}, port: {}", ip, found_port); new_node_instance(node_type.clone(), ip, &found_port); // If the node is a client, it does not need to run raft. Thus, it can return after initializing @@ -217,7 +216,9 @@ fn listen_raft_receiver(receiver: Receiver, transmitter: Sender) { }; match change_role(role.to_owned(), transmitter.clone()) { Ok(_) => { - println!("Role changing finished succesfully"); + println!( + "{color_bright_green}Role changing finished succesfully{color_reset}" + ); } Err(_) => { println!("Error could not change role to {:?}", role); @@ -225,54 +226,40 @@ fn listen_raft_receiver(receiver: Receiver, transmitter: Sender) { } } Err(_) => { - // println!("Error receiving from raft transmitter: {:?}", e); + // Do nothing } } }); } fn change_role(new_role: NodeType, transmitter: Sender) -> Result<(), Error> { - println!("Changing role to {:?}", new_role); - if new_role == NodeType::Client { - println!("NodeRole cannot be changed to Client, it is not a valid role"); + eprintln!("NodeRole cannot be changed to Client, it is not a valid role"); return Err(Error); }; - println!("Trying to get node instance"); let node_instance = get_node_instance(); - println!("AFTER get node instance"); let current_instance = &mut node_instance.instance; - println!("AFTER current instance"); - if node_instance.node_type == new_role { - println!("NodeRole is already {:?}", new_role); confirm_role_change(transmitter); return Ok(()); } - println!("node type changes"); - let ip = node_instance.ip.clone(); let port = node_instance.port.clone(); - println!("ip: {}, port: {}", ip, port); - // Stop current instance match current_instance.as_mut() { Some(instance) => { - println!("Stopping current instance"); instance.stop(); } None => { - println!("Node instance not initialized"); + eprintln!("Node instance not initialized"); return Err(Error); } } - println!("AFTER STOPPING current instance"); - match new_role { NodeType::Router => { init_router(&ip, &port); @@ -281,12 +268,11 @@ fn change_role(new_role: NodeType, transmitter: Sender) -> Result<(), Erro init_shard(&ip, &port); } _ => { - println!("NodeRole can only be changed to Router or Shard"); + eprintln!("NodeRole can only be changed to Router or Shard"); return Err(Error); } } - println!("AFTER CHANGING current instance"); confirm_role_change(transmitter); Ok(()) } @@ -307,9 +293,6 @@ fn new_node_instance(node_type: NodeType, ip: &str, port: &str) { } fn init_router(ip: &str, port: &str) { - // sleep for 5 seconds to allow the stream to be ready to read - //thread::sleep(std::time::Duration::from_secs(5)); - let router = Router::new(ip, port); unsafe { @@ -326,14 +309,10 @@ fn init_router(ip: &str, port: &str) { let port_clone = port.to_string(); let _handle = thread::spawn(move || { Router::wait_for_incomming_connections(&shared_router, ip_clone, port_clone); - println!("Router comes back from wait_for_incomming_connections"); }); - - println!("Router node initializes"); } fn init_shard(ip: &str, port: &str) { - println!("Sharding node initializing"); let shard = Shard::new(ip, port); unsafe { @@ -350,14 +329,10 @@ fn init_shard(ip: &str, port: &str) { let port_clone = port.to_string(); let _handle = thread::spawn(move || { Shard::accept_connections(shared_shard, ip_clone, port_clone); - println!("Shard comes back from accept_connections"); }); - - println!("Sharding node initializes"); } fn init_client(ip: &str, port: &str) { - println!("Client node initializing"); unsafe { NODE_INSTANCE = Some(NodeInstance::new( Box::new(Client::new(ip, port)), @@ -366,5 +341,4 @@ fn init_client(ip: &str, port: &str) { NodeType::Client, )); } - println!("Client node initializes"); } diff --git a/sharding/src/node/router.rs b/sharding/src/node/router.rs index d9b909ed00323..852e601ae2d6e 100644 --- a/sharding/src/node/router.rs +++ b/sharding/src/node/router.rs @@ -42,7 +42,6 @@ pub struct Router { impl Router { /// Creates a new Router node with the given port and ip, connecting it to the shards specified in the configuration file. pub fn new(ip: &str, port: &str) -> Self { - println!("Inside new::Router"); Router::initialize_router_with_connections(ip, port) } @@ -53,12 +52,8 @@ impl Router { waiting_port: String, ) { let port = waiting_port.parse::().unwrap() + 1000; - println!("Attempting to bind listener to port: {}", port); - let listener = TcpListener::bind(format!("{}:{}", ip, port)).unwrap(); - println!("wait_for_incomming_connections"); - loop { let stopped = { let router = match shared_router.lock() { @@ -80,7 +75,6 @@ impl Router { }; if stopped { - println!("Stopped is true"); drop(listener); return; } @@ -101,7 +95,6 @@ impl Router { let stream_clone = Arc::clone(&shareable_stream); thread::spawn(move || { - println!("Inside thread in wait_for_client"); Router::listen(&router_clone, &stream_clone); }); } @@ -133,7 +126,6 @@ impl Router { }; if stopped { - println!("Stopped is true"); return; } @@ -235,7 +227,6 @@ impl Router { } fn handle_hello_from_node_message(&mut self, message: &Message) -> Option { - println!("Received HelloFromNode message"); let node_info = match message.get_data().node_info { Some(node_info) => node_info, None => { @@ -286,7 +277,6 @@ impl Router { fn configure_connections(&mut self) { let config = get_nodes_config(); for shard in config.nodes { - println!("Configuring connection to shard: {:?}", shard); if (shard.ip == self.ip.as_ref()) && (shard.port == self.port.as_ref()) { self.name = shard.name.clone(); continue; @@ -304,19 +294,15 @@ impl Router { .set_health_connection(node_ip.as_str(), node_port.as_str()) .is_err() { - println!("Failed to connect to node: {}", node.name); + eprintln!("Failed to connect to node: {}", node.name); return; } - println!("Connecting to ip {} and port: {}", node_ip, node_port); - let Ok(shard_client) = connect_to_node(&node_ip, &node_port) else { - println!("Failed to connect to port: {node_port}"); + eprintln!("Failed to connect to port: {node_port}"); return; }; - println!("CONNECTED to ip {} and port: {}", node_ip, node_port); - self.save_shard_client(node_port.to_string(), shard_client); } @@ -329,7 +315,6 @@ impl Router { /// Sets the health_connection to the shard with the given ip and port, initializing the communication with a handshake between the router and the shard. fn set_health_connection(&mut self, node_ip: &str, node_port: &str) -> Result<(), Error> { let Ok(health_connection) = Router::get_shard_channel(node_ip, node_port) else { - println!("Failed to create health-connection to port: {node_port}"); return Err(Error); }; @@ -361,13 +346,10 @@ impl Router { name: self.name.clone(), }; let update_message = Message::new_init_connection(node_info); - println!("Sending message to shard: {update_message:?}"); let message_string = update_message.to_string(); stream.write_all(message_string.as_bytes()).unwrap(); - println!("Waiting for response from shard"); - let response: &mut [u8] = &mut [0; 1024]; // Wait for timeout and read response @@ -396,18 +378,15 @@ impl Router { self.handle_memory_update_message(&response_message.clone(), node_port) } _ => { - println!("{color_red}Shard {node_port} denied the connection{style_reset}"); + eprintln!("{color_red}Shard {node_port} denied the connection{style_reset}"); false } } } fn handle_agreed_message(&mut self, message: &Message, node_port: &str) -> bool { - println!("{color_bright_green}Shard {node_port} accepted the connection{style_reset}"); let memory_size = message.get_data().payload.unwrap(); let max_ids_info = message.get_data().max_ids.unwrap(); - println!("{color_bright_green}Memory size: {memory_size}{style_reset}"); - println!("{color_bright_green}Max Ids for Shard: {max_ids_info:?}{style_reset}"); self.save_shard_in_manager(memory_size, node_port, max_ids_info); true } @@ -415,10 +394,6 @@ impl Router { fn handle_memory_update_message(&mut self, message: &Message, node_port: &str) -> bool { let memory_size = message.get_data().payload.unwrap(); let max_ids_info = message.get_data().max_ids.unwrap(); - println!( - "{color_bright_green}Shard {node_port} updated its memory size to {memory_size}{style_reset}" - ); - println!("{color_bright_green}Max Ids for Shard: {max_ids_info:?}{style_reset}"); self.update_shard_in_manager(memory_size, node_port, max_ids_info); true } @@ -428,8 +403,6 @@ impl Router { let mut shard_manager = self.shard_manager.as_ref().clone(); shard_manager.add_shard(memory_size, shard_id.to_string()); shard_manager.save_max_ids_for_shard(shard_id.to_string(), max_ids); - println!("{color_bright_green}Shard {shard_id} added to ShardManager{style_reset}"); - println!("Shard Manager: {shard_manager:?}"); } /// Updates the shard in the `ShardManager` with the given memory size and shard id. @@ -437,14 +410,11 @@ impl Router { let mut shard_manager = self.shard_manager.as_ref().clone(); shard_manager.update_shard_memory(memory_size, shard_id.to_string()); shard_manager.save_max_ids_for_shard(shard_id.to_string(), max_ids); - println!("{color_bright_green}Shard {shard_id} updated in ShardManager{style_reset}"); - println!("Shard Manager: {shard_manager:?}"); } /// Establishes a health connection with the node with the given ip and port, returning a Channel. fn get_shard_channel(node_ip: &str, node_port: &str) -> Result { let port = node_port.parse::().unwrap() + 1000; - println!("Attempting to connect to port: {}", port); match TcpStream::connect(format!("{node_ip}:{port}")) { Ok(stream) => { println!( @@ -454,12 +424,7 @@ impl Router { stream: Arc::new(Mutex::new(stream)), }) } - Err(e) => { - println!( - "{color_red}Error establishing health connection with {node_ip}:{port}. Error: {e:?}{style_reset}" - ); - Err(e) - } + Err(e) => Err(e), } } @@ -470,13 +435,10 @@ impl Router { /// Returns the query formatted if needed (if there's a 'WHERE ID=' clause, offset might need to be removed) fn get_data_needed_from(&mut self, query: &str) -> (Vec, bool, String) { if let Some(id) = get_id_if_exists(query) { - println!("ID found in query: {id}"); return self.get_specific_shard_with(id, query); } - println!("ID NOT FOUND in query."); if query_is_insert(query) { - println!("Query is INSERT"); let shard = match self.shard_manager.peek() { Some(shard) => shard, None => { @@ -503,8 +465,6 @@ impl Router { ); }; - println!("Table name: {table_name}"); - for shard_id in self.shards.lock().unwrap().keys() { let Some(max_id) = self .shard_manager @@ -525,7 +485,6 @@ impl Router { } } - println!("ID not found in any shard"); return ( self.shards.lock().unwrap().keys().cloned().collect(), query_affects_memory_state(query), @@ -569,12 +528,8 @@ impl NodeRole for Router { return None; } - println!("Router send_query called with query: {received_query:?}"); - let (shards, is_insert, query) = self.get_data_needed_from(received_query); - println!("Shards: {shards:?}, is_insert: {is_insert}, query: {query}"); - // If there are no shards available, the router uses its own backend to execute the query if shards.is_empty() { let response = match self.send_query_to_backend(received_query) { @@ -639,7 +594,6 @@ impl NodeRole for Router { }; let response = if query_is_select(&query) && !responses.is_empty() { - println!("Query is SELECT and shards_responses is not empty"); self.format_response(responses, &query) } else { let rows_lock = match rows.lock() { @@ -761,9 +715,6 @@ impl Router { } fn send_query_to_backend(&mut self, query: &str) -> Option { - println!( - "{color_bright_green}Sending query to the router database: ({query}){style_reset}" - ); let rows = self.get_rows_for_query(query)?; let response = format_rows_without_offset(rows); Some(response) @@ -780,7 +731,7 @@ impl Router { fn redistribute_data(&mut self) { if !self.backend_has_data() { - println!("No data found in backend. Skipping redistribution."); + eprintln!("No data found in backend. Skipping redistribution."); return; } @@ -793,7 +744,7 @@ impl Router { }; if shards.is_empty() { - println!("No shards found to redistribute data. Holding on to data until shards are available."); + eprintln!("No shards found to redistribute data. Holding on to data until shards are available."); return; } @@ -920,7 +871,6 @@ impl Router { for table in tables { let drop_query = format!("DROP TABLE IF EXISTS {}", table); let _ = self.get_rows_for_query(&drop_query); - println!("Dropped table {}", table); } } } diff --git a/sharding/src/node/shard.rs b/sharding/src/node/shard.rs index a407f0f37a453..9a4d2d2faa9e6 100644 --- a/sharding/src/node/shard.rs +++ b/sharding/src/node/shard.rs @@ -49,18 +49,10 @@ impl Shard { /// Creates a new Shard node in the given port. #[must_use] pub fn new(ip: &str, port: &str) -> Self { - println!("Creating a new Shard node in port: {port}"); - println!("Connecting to the database in port: {port}"); - let backend: PostgresClient = connect_to_node(ip, port).unwrap(); let memory_manager = Self::initialize_memory_manager(); - println!( - "{color_blue}[Shard] Available Memory: {:?} %{style_reset}", - memory_manager.available_memory_perc - ); - let name = match find_name_for_node(ip.to_string(), port.to_string()) { Some(name) => name, None => { @@ -94,8 +86,6 @@ impl Shard { } pub fn look_for_sharding_network(ip: &str, port: &str, name: &str) { - println!("Checking if there's a sharding network ..."); - let config = get_nodes_config(); let mut candidate_ip; let mut candidate_port; @@ -130,7 +120,6 @@ impl Shard { port: port.to_string(), name: name.to_string(), }); - println!("{color_bright_green}Sending HelloFromNode message to {candidate_ip}:{candidate_port}{style_reset}"); candidate_stream .write_all(hello_message.to_string().as_bytes()) @@ -140,7 +129,6 @@ impl Shard { pub fn accept_connections(shared_shard: Arc>, ip: String, accepting_port: String) { let port = accepting_port.parse::().unwrap() + 1000; - println!("Attempting to bind listener to port: {}", port); let listener = TcpListener::bind(format!("{}:{}", ip, port)).unwrap(); @@ -172,7 +160,6 @@ impl Shard { }; if *must_stop { - println!("{color_red}STOPPED ACCEPT CONNECTIONS{style_reset}"); drop(listener); handles .into_iter() @@ -200,9 +187,7 @@ impl Shard { let stopped_clone = stopped.clone(); let _handle = thread::spawn(move || { - println!("Inside thread spawn on accept_connections"); Shard::listen(&shard_clone, &stream_clone, stopped_clone); - println!("Listening thread finished"); }); handles.push(_handle); } @@ -219,8 +204,6 @@ impl Shard { tcp_stream: &Arc>, stopped: Arc>, ) { - println!("Listening for incoming messages"); - let mut stream = match tcp_stream.lock() { Ok(stream) => stream, Err(_) => { @@ -238,7 +221,6 @@ impl Shard { } loop { - // println!("Inside listen loop"); let must_stop = match stopped.lock() { Ok(stopped) => stopped, Err(_) => { @@ -248,23 +230,16 @@ impl Shard { }; if *must_stop { - println!("{color_red}STOPPED LISTENING{style_reset}"); drop(stream); return; } std::mem::drop(must_stop); - // println!("LOOPING listen"); - // sleep for 1 millisecond to allow the stream to be ready to read thread::sleep(std::time::Duration::from_millis(1)); let mut buffer = [0; 1024]; - // println!("Before stream lock"); - - // println!("After stream lock"); - match stream.set_nonblocking(true) { Ok(()) => {} Err(e) => { @@ -273,7 +248,6 @@ impl Shard { } } - // println!("Before stream read"); match stream.read(&mut buffer) { Ok(chars) => { if chars == 0 { @@ -308,10 +282,6 @@ impl Shard { } if let Some(response) = shard.get_response_message(message) { - println!( - "{color_bright_green}Received message: {message_string}{style_reset}" - ); - println!("{color_bright_green}Sending response: {response}{style_reset}"); stream.write_all(response.as_bytes()).unwrap(); } } @@ -347,13 +317,11 @@ impl Shard { fn handle_init_connection_message(&mut self, message: Message) -> Option { let router_info = message.get_data().node_info.unwrap(); self.router_info = Arc::new(Mutex::new(Some(router_info.clone()))); - println!("{color_bright_green}Received an InitConnection message{style_reset}"); let response_string = self.get_agreed_connection(); Some(response_string) } fn handle_memory_update_message(&mut self) -> Option { - println!("{color_bright_green}Received an AskMemoryUpdate message{style_reset}"); let response_string = self.get_memory_update_message(); Some(response_string) } @@ -385,9 +353,7 @@ impl Shard { fn get_memory_update_message(&mut self) -> String { match self.update() { - Ok(()) => { - println!("Memory updated successfully"); - } + Ok(()) => {} Err(e) => { eprintln!("Failed to update memory: {e:?}"); } @@ -446,16 +412,13 @@ impl NodeRole for Shard { } fn stop(&mut self) { - println!("{color_red}Stopping shard{style_reset}"); match self.stopped.lock() { Ok(mut stopped) => { - println!("{color_red}Setting stopped to true{style_reset}"); *stopped = true; } Err(_) => { eprintln!("Failed to stop router"); } } - println!("{color_red}Shard stopped{style_reset}"); } } diff --git a/sharding/src/node/shard_manager.rs b/sharding/src/node/shard_manager.rs index 374514fe89055..8093f3590ad81 100644 --- a/sharding/src/node/shard_manager.rs +++ b/sharding/src/node/shard_manager.rs @@ -1,5 +1,4 @@ use indexmap::IndexMap; -use inline_colorization::*; use std::{ cmp::Ordering, collections::BinaryHeap, @@ -27,13 +26,11 @@ impl ShardManager { key: value, value: shard_id, }; - println!("Adding shard: {:?}", object); let mut shards = self.shards.lock().unwrap(); shards.push(object); } pub fn peek(&self) -> Option { - println!("Peeking shards: {:?}", self.shards); match self.shards.lock().unwrap().peek() { Some(object) => Some(object.value.clone()), None => None, @@ -45,18 +42,8 @@ impl ShardManager { /// If the memory is lower than the current top shard, it will be placed in the correct position in the heap. /// If the memoty is zero, the shard will be at the base of the heap until it is updated once again. pub fn update_shard_memory(&mut self, memory: f64, shard_id: String) { - println!( - "{color_bright_green}Updating shard memory: {} to {}{style_reset}", - shard_id, memory - ); - self.delete(shard_id.clone()); self.add_shard(memory, shard_id); - - println!( - "{color_bright_green}Shard memory updated: {:?}{style_reset}", - self.shards - ); } fn pop(&mut self) -> Option { @@ -93,8 +80,6 @@ impl ShardManager { pub fn get_max_ids_for_shard_table(&self, shard_id: &str, table: &str) -> Option { let shard_max_ids = self.shard_max_ids.lock().unwrap(); - println!("shard_max_ids: {:?}", shard_max_ids); - println!("shard_id: {:?}, table: {:?}", shard_id, table); match shard_max_ids.get(shard_id) { Some(tables_id_info) => match tables_id_info.get(table) { Some(max_id) => Some(*max_id), diff --git a/sharding/src/psql/psql.rs b/sharding/src/psql/psql.rs index e1efc8e57d62f..e894f602e9454 100644 --- a/sharding/src/psql/psql.rs +++ b/sharding/src/psql/psql.rs @@ -1,11 +1,9 @@ use std::ffi::CStr; extern crate users; use super::super::node::node::*; -use inline_colorization::*; #[no_mangle] pub extern "C" fn SendQueryToShard(query_data: *const i8) -> bool { - println!("{color_blue}{style_bold}SendQueryToShard called{style_reset}"); unsafe { if query_data.is_null() { eprintln!("Received a null pointer"); @@ -21,7 +19,6 @@ pub extern "C" fn SendQueryToShard(query_data: *const i8) -> bool { } }; - println!("Received Query: {:?}", query); handle_query(query.trim()) } } diff --git a/sharding/tests/integration_tests.rs b/sharding/tests/integration_tests.rs index 1187f6c661a88..0bcdae7573b6d 100644 --- a/sharding/tests/integration_tests.rs +++ b/sharding/tests/integration_tests.rs @@ -97,7 +97,6 @@ mod integration_test { let port_clone = "5433".to_string(); let _handle = thread::spawn(move || { Shard::accept_connections(shared_shard, ip_clone, port_clone); - println!("Shard comes back from accept_connections"); }); thread::sleep(std::time::Duration::from_secs(15)); @@ -147,7 +146,6 @@ mod integration_test { let port_clone = "5433".to_string(); let _handle = thread::spawn(move || { Shard::accept_connections(shared_shard, ip_clone, port_clone); - println!("Shard comes back from accept_connections"); }); thread::sleep(std::time::Duration::from_secs(15)); From e7614e7e317ec85b39bea8af3e91a4a9b590d864 Mon Sep 17 00:00:00 2001 From: aldoRastrelli Date: Wed, 4 Dec 2024 18:54:55 -0300 Subject: [PATCH 02/13] Fixed Query error --- sharding/src/node/node.rs | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/sharding/src/node/node.rs b/sharding/src/node/node.rs index a56dd50d867a6..5c4a76c13a7d9 100644 --- a/sharding/src/node/node.rs +++ b/sharding/src/node/node.rs @@ -4,7 +4,6 @@ use crate::node::client::Client; use crate::utils::node_config::get_nodes_config_raft; use crate::utils::node_config::INIT_HISTORY_FILE_PATH; use crate::utils::queries::print_rows; -use inline_colorization::*; use postgres::Row; use std::ffi::CStr; use std::fmt::Error; @@ -25,22 +24,27 @@ pub trait NodeRole { fn stop(&mut self); fn get_all_tables_from_self(&mut self, check_if_empty: bool) -> Vec { - // Select all tables that have data in them - let query = if check_if_empty { - "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND EXISTS ( SELECT 1 FROM table_name LIMIT 1 )" - } else { - "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'" - }; - + let query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"; let Some(rows) = self.get_rows_for_query(query) else { return Vec::new(); }; - let mut tables = Vec::new(); + + // If no need to check for emptiness, return all table names + if !check_if_empty { + return rows.into_iter().map(|row| row.get(0)).collect(); + } + + // Step 3: Check if tables have data dynamically + let mut non_empty_tables = Vec::new(); for row in rows { let table_name: String = row.get(0); - tables.push(table_name); + let exists_query = format!("SELECT 1 FROM {} LIMIT 1", table_name); + if self.get_rows_for_query(&exists_query).is_some() { + non_empty_tables.push(table_name); + } } - tables + + non_empty_tables } fn get_rows_for_query(&mut self, query: &str) -> Option> { From ed644098f5dd3ac8c932328e2684b7c33ed206a1 Mon Sep 17 00:00:00 2001 From: aldoRastrelli Date: Wed, 4 Dec 2024 20:45:06 -0300 Subject: [PATCH 03/13] Delete/change prints --- sharding/src/node/client.rs | 2 +- sharding/src/node/router.rs | 25 +++++++++---------------- sharding/src/node/send_query_result.rs | 1 - sharding/src/node/shard.rs | 13 ++++++++----- sharding/src/node/shard_manager.rs | 16 ++++++++-------- 5 files changed, 26 insertions(+), 31 deletions(-) diff --git a/sharding/src/node/client.rs b/sharding/src/node/client.rs index 9ac47d422a5e9..b31a12bef7d36 100644 --- a/sharding/src/node/client.rs +++ b/sharding/src/node/client.rs @@ -206,7 +206,7 @@ impl NodeRole for Client { fn send_query(&mut self, query: &str) -> Option { if query == "whoami;" { - println!("> I am Client: {}:{}\n", self.ip, self.port); + println!("{color_bright_green}> I am Client: {}:{}{style_reset}\n", self.ip, self.port); return None; } diff --git a/sharding/src/node/router.rs b/sharding/src/node/router.rs index e968f188790b0..ae93e5f696e58 100644 --- a/sharding/src/node/router.rs +++ b/sharding/src/node/router.rs @@ -45,7 +45,7 @@ pub struct Router { impl Router { /// Creates a new Router node with the given port and ip, connecting it to the shards specified in the configuration file. pub fn new(ip: &str, port: &str) -> Option { - println!("Inside new::Router"); + println!("Creating Router..."); Router::initialize_router_with_connections(ip, port) } @@ -63,7 +63,6 @@ impl Router { } }; let port = port_number + 1000; - println!("Attempting to bind listener to port: {}", port); let listener = match TcpListener::bind(format!("{}:{}", ip, port)) { Ok(listener) => listener, @@ -73,7 +72,7 @@ impl Router { } }; - println!("wait_for_incoming_connections"); + println!("Waiting for incoming connections.\n\0"); loop { let stopped = { @@ -293,7 +292,9 @@ impl Router { let mut tables = self.get_all_tables_from_shards(); tables.extend(self.get_all_tables_from_self(false)); - println!("Tables: {tables:?}"); + // delete duplicated tables + tables.sort(); + tables.dedup(); for table in tables { let create_query = self.generate_create_table_query(&table, None); @@ -365,12 +366,12 @@ impl Router { .set_health_connection(node_ip.as_str(), node_port.as_str()) .is_err() { - eprintln!("Failed to connect to node: {}", node.name); + eprintln!("Could not connect to node: {} at {}:{}", node.name, node_ip, node_port); return; } let Ok(shard_client) = connect_to_node(&node_ip, &node_port) else { - eprintln!("Failed to connect to port: {node_port}"); + eprintln!("Could not connect to node: {} at {}:{}", node.name, node_ip, node_port); return; }; @@ -507,7 +508,6 @@ impl Router { let max_ids_info = max_ids_info; println!("{color_bright_green}Memory size: {memory_size}{style_reset}"); - println!("{color_bright_green}Max Ids for Shard: {max_ids_info:?}{style_reset}"); self.save_shard_in_manager(memory_size, node_port, max_ids_info); true } @@ -534,7 +534,6 @@ impl Router { println!( "{color_bright_green}Shard {node_port} updated its memory size to {memory_size}{style_reset}" ); - println!("{color_bright_green}Max Ids for Shard: {max_ids_info:?}{style_reset}"); self.update_shard_in_manager(memory_size, node_port, max_ids_info); true } @@ -567,7 +566,7 @@ impl Router { }; let port = port_number + 1000; - println!("Attempting to connect to port: {}", port); + println!("Attempting to connect to: {}:{}", node_ip, port); match TcpStream::connect(format!("{node_ip}:{port}")) { Ok(stream) => { println!( @@ -700,7 +699,7 @@ impl NodeRole for Router { fn send_query(&mut self, received_query: &str) -> Option { if received_query == "whoami;" { - println!("> I am Router: {}:{}\n", self.ip, self.port); + println!("{color_bright_green}> I am Router: {}:{}{style_reset}\n", self.ip, self.port); return None; } @@ -825,7 +824,6 @@ impl NodeRole for Router { return Some("Relation (table) does not exist".to_string()); } - println!("All threads finished"); let responses = match shards_responses.lock() { Ok(shards_responses) => shards_responses.clone(), Err(_) => { @@ -1091,8 +1089,6 @@ impl Router { table ); - println!("Query: {query}"); - let rows = if let Some(shard) = shard_id { println!("Sending query to shard {shard}: {query}"); match self.send_query_to_shard(&shard, &query, false) { @@ -1112,8 +1108,6 @@ impl Router { } }; - println!("Rows: {rows:?}"); - let mut columns_definitions: Vec = rows .iter() .enumerate() @@ -1202,7 +1196,6 @@ impl Router { let drop_query = format!("DELETE FROM {}", table); let _ = self.get_rows_for_query(&drop_query); - println!("Table {} was emptied", table); } } } diff --git a/sharding/src/node/send_query_result.rs b/sharding/src/node/send_query_result.rs index 393b627b33852..036bbc7a0757f 100644 --- a/sharding/src/node/send_query_result.rs +++ b/sharding/src/node/send_query_result.rs @@ -25,7 +25,6 @@ impl std::error::Error for SendQueryError {} pub fn is_connection_closed(err: &Error) -> bool { let err = format!("{}", err); - println!("Analyzing error: {}", err); return err.contains("connection closed") || err.contains("kind: Connection reset by peer"); } diff --git a/sharding/src/node/shard.rs b/sharding/src/node/shard.rs index 0387a9fd0ca08..2690f5b89a9f4 100644 --- a/sharding/src/node/shard.rs +++ b/sharding/src/node/shard.rs @@ -6,6 +6,7 @@ use super::tables_id_info::TablesIdInfo; use crate::node::messages::node_info::find_name_for_node; use crate::utils::common::{connect_to_node, ConvertToString}; use crate::utils::node_config::{get_memory_config, get_nodes_config}; +use crate::utils::queries::query_affects_memory_state; use indexmap::IndexMap; use inline_colorization::*; use postgres::Client as PostgresClient; @@ -50,8 +51,7 @@ impl Shard { let backend: PostgresClient = match connect_to_node(ip, port) { Ok(backend) => backend, Err(e) => { - eprintln!("Failed to connect to the database: {e}"); - panic!("Failed to connect to the database"); + panic!("Failed to connect to the database: {e}"); } }; @@ -78,7 +78,7 @@ impl Shard { let _ = shard.update(); - println!("{color_bright_green}Shard created successfully. Shard: {shard:?}{style_reset}"); + println!("{color_bright_green}Shard created successfully. Shard: {}, {}:{} {style_reset}", shard.name, ip, port); shard } @@ -451,7 +451,7 @@ impl Shard { let max_id: i32 = if let Ok(id) = rows[0].try_get(0) { id } else { - eprintln!("Failed to get max id for table: {table}. Table might be empty",); + // Table is empty 0 }; let mut tables_max_id = match self.tables_max_id.as_ref().try_lock() { @@ -482,7 +482,10 @@ impl NodeRole for Shard { } let rows = self.get_rows_for_query(query)?; - let _ = self.update(); // Updates memory and tables_max_id + if query_affects_memory_state(query) { + let _ = self.update(); // Updates memory and tables_max_id + } + Some(rows.convert_to_string()) } diff --git a/sharding/src/node/shard_manager.rs b/sharding/src/node/shard_manager.rs index b9578971c98c4..6f4d7e989b0a9 100644 --- a/sharding/src/node/shard_manager.rs +++ b/sharding/src/node/shard_manager.rs @@ -30,7 +30,7 @@ impl ShardManager { let mut shards = match self.shards.lock() { Ok(shards) => shards, Err(_) => { - println!("{color_bright_red}Failed to lock shards{style_reset}"); + eprintln!("Failed to lock shards"); return; } }; @@ -41,7 +41,7 @@ impl ShardManager { let shards = match self.shards.lock() { Ok(shards) => shards, Err(_) => { - println!("{color_bright_red}Failed to lock shards{style_reset}"); + eprintln!("Failed to lock shards"); return None; } }; @@ -56,7 +56,7 @@ impl ShardManager { let shards = match self.shards.lock() { Ok(shards) => shards, Err(_) => { - println!("{color_bright_red}Failed to lock shards{style_reset}"); + eprintln!("Failed to lock shards"); return 0; } }; @@ -77,7 +77,7 @@ impl ShardManager { let mut shards = match self.shards.lock() { Ok(shards) => shards, Err(_) => { - println!("{color_bright_red}Failed to lock shards{style_reset}"); + eprintln!("Failed to lock shards"); return None; } }; @@ -107,7 +107,7 @@ impl ShardManager { let mut shards = match self.shards.lock() { Ok(shards) => shards, Err(_) => { - println!("{color_bright_red}Failed to lock shards{style_reset}"); + eprintln!("Failed to lock shards"); return; } }; @@ -128,7 +128,7 @@ impl ShardManager { let shard_max_ids = match self.shard_max_ids.lock() { Ok(shard_max_ids) => shard_max_ids, Err(_) => { - println!("{color_bright_red}Failed to lock shard_max_ids{style_reset}"); + eprintln!("Failed to lock shard_max_ids"); return; } }; @@ -140,7 +140,7 @@ impl ShardManager { let shard_max_ids = match self.shard_max_ids.lock() { Ok(shard_max_ids) => shard_max_ids, Err(_) => { - println!("{color_bright_red}Failed to lock shard_max_ids{style_reset}"); + eprintln!("Failed to lock shard_max_ids"); return Vec::new(); } }; @@ -159,7 +159,7 @@ impl ShardManager { let shard_max_ids = match self.shard_max_ids.lock() { Ok(shard_max_ids) => shard_max_ids, Err(_) => { - println!("{color_bright_red}Failed to lock shard_max_ids{style_reset}"); + eprintln!("Failed to lock shard_max_ids"); return None; } }; From 8df1b5da2ef98816dde3690cfc9ddba963ca8844 Mon Sep 17 00:00:00 2001 From: aldoRastrelli Date: Wed, 4 Dec 2024 20:45:31 -0300 Subject: [PATCH 04/13] Update prints --- sharding/src/node/node.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sharding/src/node/node.rs b/sharding/src/node/node.rs index 5c4a76c13a7d9..abc8cf4373fe4 100644 --- a/sharding/src/node/node.rs +++ b/sharding/src/node/node.rs @@ -6,6 +6,7 @@ use crate::utils::node_config::INIT_HISTORY_FILE_PATH; use crate::utils::queries::print_rows; use postgres::Row; use std::ffi::CStr; +use inline_colorization::*; use std::fmt::Error; use std::sync::mpsc::{self, Receiver, Sender}; use std::sync::{Arc, Mutex}; @@ -34,7 +35,6 @@ pub trait NodeRole { return rows.into_iter().map(|row| row.get(0)).collect(); } - // Step 3: Check if tables have data dynamically let mut non_empty_tables = Vec::new(); for row in rows { let table_name: String = row.get(0); @@ -244,8 +244,8 @@ fn listen_raft_receiver(receiver: Receiver, transmitter: Sender) { NodeType::Shard }; match change_role(role.to_owned(), transmitter.clone()) { - Ok(_) => { - println!("Role changing finished successfully"); + Ok(msg) => { + println!("{color_bright_green}{msg}{style_reset}"); } Err(_) => { println!("Error could not change role to {:?}", role); @@ -259,7 +259,7 @@ fn listen_raft_receiver(receiver: Receiver, transmitter: Sender) { }); } -fn change_role(new_role: NodeType, transmitter: Sender) -> Result<(), Error> { +fn change_role(new_role: NodeType, transmitter: Sender) -> Result { if new_role == NodeType::Client { eprintln!("NodeRole cannot be changed to Client, it is not a valid role"); return Err(Error); @@ -270,7 +270,7 @@ fn change_role(new_role: NodeType, transmitter: Sender) -> Result<(), Erro if node_instance.node_type == new_role { confirm_role_change(transmitter); - return Ok(()); + return Ok("".to_string()); } let ip = node_instance.ip.clone(); @@ -301,7 +301,7 @@ fn change_role(new_role: NodeType, transmitter: Sender) -> Result<(), Erro } confirm_role_change(transmitter); - Ok(()) + Ok(format!("Role changing finished successfully. Node is now {new_role:?}").to_string()) } fn confirm_role_change(transmitter: Sender) { From 790c24d9ba8a0bac9abe651e88de74e441996485 Mon Sep 17 00:00:00 2001 From: aldoRastrelli Date: Sun, 8 Dec 2024 21:24:47 -0300 Subject: [PATCH 05/13] Changed some prints --- sharding/src/node/router.rs | 5 +++-- sharding/src/node/shard.rs | 9 +++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sharding/src/node/router.rs b/sharding/src/node/router.rs index ae93e5f696e58..2e5dcf57c66ea 100644 --- a/sharding/src/node/router.rs +++ b/sharding/src/node/router.rs @@ -570,7 +570,7 @@ impl Router { match TcpStream::connect(format!("{node_ip}:{port}")) { Ok(stream) => { println!( - "{color_bright_green}Health connection established with {node_ip}:{port}{style_reset}" + "{color_bright_green}Connection established with {node_ip}:{port}{style_reset}" ); Ok(Channel { stream: Arc::new(Mutex::new(stream)), @@ -985,6 +985,7 @@ impl Router { } fn send_query_to_backend(&mut self, query: &str) -> Option { + println!("{color_bright_green}Sending query to Self: {query}{style_reset}"); let rows = self.get_rows_for_query(query)?; let response = format_rows_without_offset(rows); Some(response) @@ -1090,7 +1091,7 @@ impl Router { ); let rows = if let Some(shard) = shard_id { - println!("Sending query to shard {shard}: {query}"); + println!("{color_bright_green}Sending query to shard {shard}: {query}{style_reset}"); match self.send_query_to_shard(&shard, &query, false) { Ok(rows) => rows, Err(_) => { diff --git a/sharding/src/node/shard.rs b/sharding/src/node/shard.rs index 2690f5b89a9f4..8d1bfb9b514e7 100644 --- a/sharding/src/node/shard.rs +++ b/sharding/src/node/shard.rs @@ -114,10 +114,6 @@ impl Shard { let mut candidate_stream = match TcpStream::connect(format!("{}:{}", candidate_ip, candidate_port)) { Ok(stream) => { - println!( - "{color_bright_green}Health connection established with {}:{}{style_reset}", - candidate_ip, candidate_port - ); stream } Err(_) => { @@ -131,6 +127,11 @@ impl Shard { name: name.to_string(), }); + println!( + "{color_bright_green}Sending HelloFromNode to {}:{}{style_reset}", + candidate_ip, candidate_port + ); + match candidate_stream.write_all(hello_message.to_string().as_bytes()) { Ok(_) => {} Err(e) => { From d3c0b3e4120ae33f3a788b8d61c3fa58c87949ff Mon Sep 17 00:00:00 2001 From: aldoRastrelli Date: Sun, 8 Dec 2024 22:17:34 -0300 Subject: [PATCH 06/13] Cargo doc --- sharding/src/node/client.rs | 18 ++++++++++----- sharding/src/node/memory_manager.rs | 3 +++ sharding/src/node/messages/message.rs | 12 ++++++++++ sharding/src/node/messages/message_data.rs | 5 ++++ sharding/src/node/messages/node_info.rs | 6 +++++ sharding/src/node/node.rs | 19 +++++++++++++++ sharding/src/node/router.rs | 27 ++++++++++++++++++++++ sharding/src/node/send_query_result.rs | 3 +++ sharding/src/node/shard.rs | 18 ++++++++++++++- sharding/src/node/shard_manager.rs | 16 +++++++++++++ sharding/src/node/tables_id_info.rs | 3 +++ 11 files changed, 123 insertions(+), 7 deletions(-) diff --git a/sharding/src/node/client.rs b/sharding/src/node/client.rs index b31a12bef7d36..3e4aba7f6371c 100644 --- a/sharding/src/node/client.rs +++ b/sharding/src/node/client.rs @@ -25,6 +25,7 @@ pub struct Client { port: String, } +/// Implementation of Client impl Client { /// Creates a new Client node with the given port pub fn new(ip: &str, port: &str) -> Self { @@ -51,6 +52,7 @@ impl Client { } } + /// Tries to get the router info from the other nodes. If found, it connects to the router and returns the TcpStream. pub fn get_router_info(config: NodesConfig) -> Option { let mut candidate_ip; let mut candidate_port; @@ -151,6 +153,7 @@ impl Client { } } + /// Handles the router id message and returns the TcpStream if the router is found. fn handle_router_id_message(response_message: message::Message) -> Option { if let Some(node_info) = response_message.get_data().node_info { let node_ip = node_info.ip.clone(); @@ -178,6 +181,7 @@ impl Client { None } + /// Handles the received message from a connection. fn handle_received_message(buffer: &mut [u8]) { let message_string = String::from_utf8_lossy(&buffer); let response_message = match message::Message::from_string(&message_string) { @@ -199,11 +203,13 @@ impl Client { } } +/// Implementation of Node for Client impl NodeRole for Client { fn backend(&self) -> Arc> { panic!("Client node does not have a backend"); } + /// Sends query to the router and returns the response. fn send_query(&mut self, query: &str) -> Option { if query == "whoami;" { println!("{color_bright_green}> I am Client: {}:{}{style_reset}\n", self.ip, self.port); @@ -218,7 +224,7 @@ impl NodeRole for Client { query ); - // Intentar obtener el stream actual + // Get current stream let mut stream = match self.router_postgres_client.stream.lock() { Ok(stream) => stream, Err(e) => { @@ -227,7 +233,7 @@ impl NodeRole for Client { } }; - // Intentar enviar el mensaje y reconectar si falla + // Try to send the message. If it fails, reconnect to a new router if let Err(e) = stream.write_all(message.to_string().as_bytes()) { eprintln!( "{color_bright_red}Failed to send the query: {:?}{style_reset}", @@ -235,7 +241,7 @@ impl NodeRole for Client { ); eprintln!("Reconnecting to new router..."); drop(stream); - // Obtener un nuevo router y actualizar el canal + // Get a new router and update the stream let new_stream = match Self::get_router_info(get_nodes_config()) { Some(new_stream) => new_stream, None => { @@ -252,10 +258,10 @@ impl NodeRole for Client { println!("{color_bright_blue}Query sent to router{style_reset}"); - // Preparar el buffer de respuesta + // Prepare the response buffer let mut buffer: [u8; MAX_PAGE_SIZE] = [0; MAX_PAGE_SIZE]; - // Intentar leer la respuesta y reconectar si falla + // Try to read the response and reconnect if it fails match stream.read(&mut buffer) { Ok(chars) if chars > 0 => { Client::handle_received_message(&mut buffer); @@ -266,7 +272,7 @@ impl NodeRole for Client { eprintln!("Reconnecting to new router..."); drop(stream); - // Obtener un nuevo router y actualizar el canal + // Get a new router and update the stream if let Some(new_stream) = Self::get_router_info(get_nodes_config()) { self.router_postgres_client = Channel { stream: Arc::new(Mutex::new(new_stream)), diff --git a/sharding/src/node/memory_manager.rs b/sharding/src/node/memory_manager.rs index f4d921423b581..6d03037d10a6c 100644 --- a/sharding/src/node/memory_manager.rs +++ b/sharding/src/node/memory_manager.rs @@ -12,6 +12,7 @@ pub struct MemoryManager { } impl MemoryManager { + /// Creates a new MemoryManager. pub fn new(unavailable_memory_perc: f64) -> Self { let available_memory_perc = match Self::get_available_memory_percentage(unavailable_memory_perc) { @@ -24,6 +25,7 @@ impl MemoryManager { } } + /// Updates the available memory percentage. pub fn update(&mut self) -> Result<(), io::Error> { self.available_memory_perc = match Self::get_available_memory_percentage(self.unavailable_memory_perc) { @@ -38,6 +40,7 @@ impl MemoryManager { Ok(()) } + /// Returns the available memory percentage. fn get_available_memory_percentage(unavailable_memory_perc: f64) -> Option { if unavailable_memory_perc == 100.0 { return Some(0.0); diff --git a/sharding/src/node/messages/message.rs b/sharding/src/node/messages/message.rs index aa31004165631..67c21661fc7c1 100644 --- a/sharding/src/node/messages/message.rs +++ b/sharding/src/node/messages/message.rs @@ -48,6 +48,7 @@ impl fmt::Debug for Message { impl Message { // --- Constructors --- + /// Creates a new Message with the InitConnection type pub fn new_init_connection(node_info: NodeInfo) -> Self { Message { message_type: MessageType::InitConnection, @@ -58,6 +59,7 @@ impl Message { } } + /// Creates a new Message with the AskMemoryUpdate type pub fn new_ask_memory_update() -> Self { Message { message_type: MessageType::AskMemoryUpdate, @@ -68,6 +70,7 @@ impl Message { } } + /// Creates a new Message with the MemoryUpdate type pub fn new_memory_update(payload: f64, max_ids: TablesIdInfo) -> Self { Message { message_type: MessageType::MemoryUpdate, @@ -78,6 +81,7 @@ impl Message { } } + /// Creates a new Message with the Agreed type pub fn new_agreed(memory_percentage: f64, max_ids: TablesIdInfo) -> Self { Message { message_type: MessageType::Agreed, @@ -88,6 +92,7 @@ impl Message { } } + /// Creates a new Message with the Denied type pub fn new_denied() -> Self { Message { message_type: MessageType::Denied, @@ -98,6 +103,7 @@ impl Message { } } + /// Creates a new Message with the GetRouter type pub fn new_get_router() -> Self { Message { message_type: MessageType::GetRouter, @@ -108,6 +114,7 @@ impl Message { } } + /// Creates a new Message with the RouterId type pub fn new_router_id(node_info: NodeInfo) -> Self { Message { message_type: MessageType::RouterId, @@ -118,6 +125,7 @@ impl Message { } } + /// Creates a new Message with the HelloFromNode type pub fn new_hello_from_node(node_info: NodeInfo) -> Self { Message { message_type: MessageType::HelloFromNode, @@ -128,6 +136,7 @@ impl Message { } } + /// Creates a new Message with the NoRouterData type pub fn new_no_router_data() -> Self { Message { message_type: MessageType::NoRouterData, @@ -138,6 +147,7 @@ impl Message { } } + /// Creates a new Message with the Query type pub fn new_query(sender_info: Option, query: String) -> Self { Message { message_type: MessageType::Query, @@ -148,6 +158,7 @@ impl Message { } } + /// Creates a new Message with the QueryResponse type pub fn new_query_response(query_response: String) -> Self { Message { message_type: MessageType::QueryResponse, @@ -160,6 +171,7 @@ impl Message { // --- Method to get the data --- + /// Get the data from the message pub fn get_data(&self) -> MessageData { match self.message_type { MessageType::InitConnection | MessageType::RouterId | MessageType::HelloFromNode => { diff --git a/sharding/src/node/messages/message_data.rs b/sharding/src/node/messages/message_data.rs index 34e931fe27153..32e4148e99e3f 100644 --- a/sharding/src/node/messages/message_data.rs +++ b/sharding/src/node/messages/message_data.rs @@ -15,6 +15,7 @@ pub struct MessageData { impl MessageData { // - Constructors - + /// Creates a new MessageData with a payload and max_ids pub fn new_payload(payload: f64, max_ids: TablesIdInfo) -> Self { MessageData { payload: Some(payload), @@ -24,6 +25,7 @@ impl MessageData { } } + /// Creates a new MessageData with a node_info pub fn new_node_info(node_info: NodeInfo) -> Self { MessageData { payload: None, @@ -33,6 +35,7 @@ impl MessageData { } } + /// Creates a new MessageData with a query and sender_info pub fn new_query(query: String, sender_info: Option) -> Self { MessageData { payload: None, @@ -42,6 +45,7 @@ impl MessageData { } } + /// Creates a new MessageData with a query response pub fn new_query_response(query_response: String) -> Self { MessageData { payload: None, @@ -51,6 +55,7 @@ impl MessageData { } } + /// Creates a new MessageData with no attributes pub fn new_none() -> Self { MessageData { payload: None, diff --git a/sharding/src/node/messages/node_info.rs b/sharding/src/node/messages/node_info.rs index 86ed7316306f8..d077d4619d55b 100644 --- a/sharding/src/node/messages/node_info.rs +++ b/sharding/src/node/messages/node_info.rs @@ -2,6 +2,7 @@ use std::str::FromStr; use crate::utils::node_config::get_nodes_config; +/// Struct that represents the information of a node: ip, port and name. #[derive(Clone, Debug)] pub struct NodeInfo { pub ip: String, @@ -9,9 +10,11 @@ pub struct NodeInfo { pub name: String, } +/// Implementation of NodeInfo FromStr impl FromStr for NodeInfo { type Err = &'static str; + /// Parses a string to a NodeInfo struct. fn from_str(input: &str) -> Result { // split the input string by ':' let mut parts = input.split(':'); @@ -35,18 +38,21 @@ impl FromStr for NodeInfo { } } +/// Implementation of PartialEq for NodeInfo impl PartialEq for NodeInfo { fn eq(&self, other: &Self) -> bool { self.ip == other.ip && self.port == other.port } } +/// Implementation of Display for NodeInfo impl std::fmt::Display for NodeInfo { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "{}:{}", self.ip, self.port) } } +/// Function to find the name of a node given its ip and port. pub fn find_name_for_node(ip: String, port: String) -> Option { let config = get_nodes_config(); for node in config.nodes { diff --git a/sharding/src/node/node.rs b/sharding/src/node/node.rs index abc8cf4373fe4..a0e9128fcf170 100644 --- a/sharding/src/node/node.rs +++ b/sharding/src/node/node.rs @@ -22,8 +22,10 @@ pub trait NodeRole { /// Sends a query to the shard group fn send_query(&mut self, query: &str) -> Option; + /// Stops the node instance fn stop(&mut self); + /// Returns all tables currently existing in the node's PostgreSQL cluster fn get_all_tables_from_self(&mut self, check_if_empty: bool) -> Vec { let query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"; let Some(rows) = self.get_rows_for_query(query) else { @@ -47,6 +49,7 @@ pub trait NodeRole { non_empty_tables } + /// Query the node's PostgreSQL cluster and return the obtained rows fn get_rows_for_query(&mut self, query: &str) -> Option> { // SQLSTATE Code Error for "relation does not exist" const UNDEFINED_TABLE_CODE: &str = "42P01"; @@ -83,6 +86,7 @@ pub trait NodeRole { } } +/// NodeType enum #[repr(C)] #[derive(Debug, PartialEq, Clone)] pub enum NodeType { @@ -93,6 +97,7 @@ pub enum NodeType { // MARK: Node Singleton +/// NodeInstance struct. It includes: instance, ip, port, and node_type. pub struct NodeInstance { pub instance: Option>, pub ip: String, @@ -100,7 +105,9 @@ pub struct NodeInstance { pub node_type: NodeType, } +/// Implementation of NodeInstance impl NodeInstance { + /// Creates a new NodeInstance. fn new(instance: Box, ip: String, port: String, node_type: NodeType) -> Self { NodeInstance { instance: Some(instance), @@ -113,8 +120,10 @@ impl NodeInstance { // MARK: Node Instance +/// Node Instance. It holds the current node instance. pub static mut NODE_INSTANCE: Option = None; +/// Returns the current node instance. pub fn get_node_instance() -> &'static mut NodeInstance { unsafe { NODE_INSTANCE @@ -123,6 +132,7 @@ pub fn get_node_instance() -> &'static mut NodeInstance { } } +/// Returns the current node role. pub fn get_node_role() -> &'static mut dyn NodeRole { unsafe { match NODE_INSTANCE.as_mut() { @@ -175,6 +185,7 @@ pub extern "C" fn InitNodeInstance(node_type: NodeType, port: *const i8) { // MARK: Raft +/// Runs the Raft instance in a separate task fn run_raft(ip: String, port: String, transmitter: Sender, receiver: Receiver) { thread::spawn(move || { let rt = match Runtime::new() { @@ -194,6 +205,7 @@ fn run_raft(ip: String, port: String, transmitter: Sender, receiver: Recei }); } +/// Creates a new Raft instance async fn new_raft_instance( ip: String, port: String, @@ -234,6 +246,7 @@ async fn new_raft_instance( // MARK: Node Role +/// Listens for changes in the node role from Raft fn listen_raft_receiver(receiver: Receiver, transmitter: Sender) { thread::spawn(move || loop { match receiver.recv() { @@ -259,6 +272,7 @@ fn listen_raft_receiver(receiver: Receiver, transmitter: Sender) { }); } +/// Changes the node role if needed. fn change_role(new_role: NodeType, transmitter: Sender) -> Result { if new_role == NodeType::Client { eprintln!("NodeRole cannot be changed to Client, it is not a valid role"); @@ -304,12 +318,14 @@ fn change_role(new_role: NodeType, transmitter: Sender) -> Result) { transmitter .send(true) .expect("Error sending true to raft transmitter"); } +/// Initializes a new node instance based on the node type fn new_node_instance(node_type: NodeType, ip: &str, port: &str) { // Initialize node based on node type match node_type { @@ -319,6 +335,7 @@ fn new_node_instance(node_type: NodeType, ip: &str, port: &str) { } } +/// Initializes a new router fn init_router(ip: &str, port: &str) { // sleep for 5 seconds to allow the stream to be ready to read //thread::sleep(std::time::Duration::from_secs(5)); @@ -348,6 +365,7 @@ fn init_router(ip: &str, port: &str) { }); } +/// Initializes a new shard fn init_shard(ip: &str, port: &str) { let shard = Shard::new(ip, port); @@ -368,6 +386,7 @@ fn init_shard(ip: &str, port: &str) { }); } +/// Initializes a new client fn init_client(ip: &str, port: &str) { unsafe { NODE_INSTANCE = Some(NodeInstance::new( diff --git a/sharding/src/node/router.rs b/sharding/src/node/router.rs index 2e5dcf57c66ea..91c181a1a6163 100644 --- a/sharding/src/node/router.rs +++ b/sharding/src/node/router.rs @@ -206,6 +206,8 @@ impl Router { } } } + + /// Gets response message from the router, analyzing the message received. fn get_response_message(&mut self, message: &str) -> Option { if message.is_empty() { return None; @@ -232,6 +234,7 @@ impl Router { } } + /// Handles the Query message, sending the query to the shards (or it own backend, if alone) and returning the response. fn handle_query_message(&mut self, message: &Message) -> Option { let query = match message.get_data().query { Some(query) => query, @@ -253,6 +256,7 @@ impl Router { Some(response_message.to_string()) } + /// Handles the GetRouter message, returning its own information. fn handle_get_router_message(&mut self) -> Option { let self_clone = self.clone(); let ip = self_clone.ip.clone().to_string(); @@ -264,6 +268,7 @@ impl Router { Some(response_message.to_string()) } + /// Handles the HelloFromNode message, configuring the connection to the shard and redistributing the data if needed. fn handle_hello_from_node_message(&mut self, message: &Message) -> Option { println!("Received HelloFromNode message"); @@ -288,6 +293,7 @@ impl Router { Some("OK".to_string()) } + /// Duplicates the existing tables in the existing nodes into the brand new shard. fn duplicate_tables_into(&mut self, shard_id: &str) { let mut tables = self.get_all_tables_from_shards(); tables.extend(self.get_all_tables_from_self(false)); @@ -304,6 +310,7 @@ impl Router { } } + /// Gets all existing tables' names from the nodes. fn get_all_tables_from_shards(&mut self) -> Vec { self.shard_manager.get_table_names_for_all() } @@ -346,6 +353,7 @@ impl Router { Some(router) } + /// Configures the connections to the shards specified in the configuration file. fn configure_connections(&mut self) { let config = get_nodes_config(); for shard in config.nodes { @@ -486,6 +494,7 @@ impl Router { } } + /// Handles the Agreed message from the shard, saving the shard in the `ShardManager` with the given memory size and shard id. fn handle_agreed_message(&mut self, message: &Message, node_port: &str) -> bool { println!("{color_bright_green}Shard {node_port} accepted the connection{style_reset}"); @@ -512,6 +521,7 @@ impl Router { true } + /// Handles the MemoryUpdate message from the shard, updating the shard in the `ShardManager` with the given memory size and shard id. fn handle_memory_update_message(&mut self, message: &Message, node_port: &str) -> bool { let payload = match message.get_data().payload { Some(payload) => payload, @@ -616,6 +626,8 @@ impl Router { } } + /// Function that allows the router to get the specific shard that owns a given ID, and format the query with the new ID to be sent to the shard. + /// This is needed because the router abstracts the client from the sharding implementation, joining all the shards' data into a single table with a unique ID, using an offset system to avoid duplicated IDs. So, when a query with a specific ID is received, the router needs to find the specific shard that owns that ID and format the query with the new ID to be sent to the shard. fn get_specific_shard_with(&mut self, mut id: i64, query: &str) -> (Vec, bool, String) { let shards = match self.shards.lock() { Ok(shards) => shards, @@ -663,6 +675,10 @@ impl Router { ) } + /// Formats the response from the shards, unified in a single response with no duplicated IDs. + /// If the query is a SELECT query, the response will be formatted with the offset system. + /// If the query is not a SELECT query, the response will be formatted without the offset system. + /// Returns the formatted response as a String. fn format_response( &self, shards_responses: IndexMap>, @@ -871,6 +887,8 @@ impl NodeRole for Router { // MARK: - Communication with shards impl Router { + + /// Gets the stream for the shard with the given shard id. fn get_stream(&self, shard_id: &str) -> Option>> { let Ok(comm_channels) = self.comm_channels.read() else { eprintln!("Failed to get comm channels"); @@ -885,6 +903,7 @@ impl Router { Some(shard_comm_channel.stream.clone()) } + /// Initializes the message exchange with the shard, sending the message and reading the response. fn init_message_exchange( &mut self, message: &Message, @@ -936,6 +955,7 @@ impl Router { self.init_message_exchange(&message, &mut writable_stream, shard_id); } + /// Sends a query to the shard with the given shard id, returning the rows from the query. fn send_query_to_shard( &mut self, shard_id: &str, @@ -984,6 +1004,7 @@ impl Router { Err((SendQueryError::Other("Shard not found".to_string()), None)) } + /// Sends a query to the router's backend, returning the rows from the query. fn send_query_to_backend(&mut self, query: &str) -> Option { println!("{color_bright_green}Sending query to Self: {query}{style_reset}"); let rows = self.get_rows_for_query(query)?; @@ -991,6 +1012,7 @@ impl Router { Some(response) } + /// Deletes the shard with the given shard id from the router's shards. fn delete_shard(&mut self, shard_id: &str) { println!("Deleting shard {shard_id}"); let mut shard_manager = self.shard_manager.as_ref().clone(); @@ -1009,12 +1031,14 @@ impl Router { // MARK: - Data Redistribution impl Router { + /// Checks if the backend has data. fn backend_has_data(&mut self) -> bool { let query = "SELECT * FROM information_schema.tables WHERE table_schema = 'public'"; let rows = self.get_rows_for_query(query); rows.is_some() } + /// Redistributes the data from the router's backend to the shards. fn redistribute_data(&mut self) { if !self.backend_has_data() { eprintln!("No data found in backend. Skipping redistribution."); @@ -1083,6 +1107,7 @@ impl Router { self.empty_tables(&tables); } + /// Creates a CREATE query for the specified table, getting the column names and data types from the information_schema.columns table. fn generate_create_table_query(&mut self, table: &str, shard_id: Option) -> String { // Dynamically generate CREATE TABLE statement for the specified table, getting the column names and data types from the information_schema.columns table let query = format!( @@ -1134,6 +1159,7 @@ impl Router { ) } + /// Converts a row to an INSERT query. fn row_to_insert_query(&self, row: &Row, table: &str) -> String { let columns = row.columns(); let column_names: Vec = columns @@ -1180,6 +1206,7 @@ impl Router { query } + /// Empties the tables with the given names from the router's backend. fn empty_tables(&mut self, tables: &[String]) { for table in tables { // Check if table has data before trying to delete all diff --git a/sharding/src/node/send_query_result.rs b/sharding/src/node/send_query_result.rs index 036bbc7a0757f..0669c1b475944 100644 --- a/sharding/src/node/send_query_result.rs +++ b/sharding/src/node/send_query_result.rs @@ -11,6 +11,7 @@ pub enum SendQueryError { Other(String), } +/// Implementation of Display for SendQueryError impl fmt::Display for SendQueryError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -23,11 +24,13 @@ impl fmt::Display for SendQueryError { impl std::error::Error for SendQueryError {} +/// Checks if the error is a connection closed error pub fn is_connection_closed(err: &Error) -> bool { let err = format!("{}", err); return err.contains("connection closed") || err.contains("kind: Connection reset by peer"); } +/// Checks if the error is an undefined table error pub fn is_undefined_table(err: &Error) -> bool { if let Some(db_error) = err.as_db_error() { return db_error.code().code() == UNDEFINED_TABLE_CODE; diff --git a/sharding/src/node/shard.rs b/sharding/src/node/shard.rs index 8d1bfb9b514e7..f0e48398137f2 100644 --- a/sharding/src/node/shard.rs +++ b/sharding/src/node/shard.rs @@ -15,6 +15,7 @@ use std::net::{TcpListener, TcpStream}; use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; use std::{io, thread}; +use std::fmt; extern crate users; @@ -32,7 +33,7 @@ pub struct Shard { pub stopped: Arc>, } -use std::fmt; +/// Implementation of Debug for Shard impl fmt::Debug for Shard { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Shard") @@ -83,12 +84,14 @@ impl Shard { shard } + /// Initializes the memory manager for the shard fn initialize_memory_manager() -> MemoryManager { let config = get_memory_config(); let reserved_memory = config.unavailable_memory_perc; MemoryManager::new(reserved_memory) } + /// Looks for a sharding network by sending a HelloFromNode message to all nodes in the config file pub fn look_for_sharding_network(ip: &str, port: &str, name: &str) { let config = get_nodes_config(); let mut candidate_ip; @@ -144,6 +147,7 @@ impl Shard { } } + /// Accepts incoming connections pub fn accept_connections(shared_shard: Arc>, ip: String, accepting_port: String) { let port = match accepting_port.parse::() { Ok(port) => port + 1000, @@ -331,6 +335,7 @@ impl Shard { message.get_message_type() == MessageType::HelloFromNode } + /// Gets a response for the given message fn get_response_message(&mut self, message: Message) -> Option { match message.get_message_type() { MessageType::InitConnection => self.handle_init_connection_message(message), @@ -347,6 +352,8 @@ impl Shard { } } + /// Handles an InitConnection message + /// This message is used to establish a connection with the router fn handle_init_connection_message(&mut self, message: Message) -> Option { let router_info = message.get_data().node_info?; self.router_info = Arc::new(Mutex::new(Some(router_info.clone()))); @@ -354,11 +361,15 @@ impl Shard { Some(response_string) } + /// Handles a MemoryUpdate message + /// This message is used to update the memory of the shard fn handle_memory_update_message(&mut self) -> Option { let response_string = self.get_memory_update_message()?; Some(response_string) } + /// Handles a GetRouter message + /// This message is used to get the router info fn handle_get_router_message(&mut self) -> Option { let self_clone = self.clone(); let router_info: Option = { @@ -381,6 +392,8 @@ impl Shard { } } + /// Gets all tables from the shard + /// It will return a message of type Agreed with the memory percentage and the tables max id, parsed to String. fn get_agreed_connection(&self) -> Option { let memory_manager = match self.memory_manager.as_ref().try_lock() { Ok(memory_manager) => memory_manager, @@ -402,6 +415,8 @@ impl Shard { Some(response_message.to_string()) } + /// Gets a MemoryUpdate message, parsed to String + /// It will include the memory percentage and the tables max id fn get_memory_update_message(&mut self) -> Option { match self.update() { Ok(()) => {} @@ -429,6 +444,7 @@ impl Shard { Some(response_message.to_string()) } + /// Updates the memoryManager and tables_max_id fn update(&mut self) -> Result<(), io::Error> { self.set_max_ids(); match self.memory_manager.as_ref().try_lock() { diff --git a/sharding/src/node/shard_manager.rs b/sharding/src/node/shard_manager.rs index 6f4d7e989b0a9..5c9f529b81ea6 100644 --- a/sharding/src/node/shard_manager.rs +++ b/sharding/src/node/shard_manager.rs @@ -8,6 +8,7 @@ use std::{ use super::tables_id_info::TablesIdInfo; +/// Struct that manages the shards' memory and max ids in each table. #[derive(Debug, Clone)] pub(crate) struct ShardManager { shards: Arc>>, @@ -15,6 +16,8 @@ pub(crate) struct ShardManager { } impl ShardManager { + + /// Creates a new ShardManager. pub fn new() -> Self { ShardManager { shards: Arc::new(Mutex::new(BinaryHeap::new())), @@ -22,6 +25,7 @@ impl ShardManager { } } + /// Adds a shard to the heap. pub fn add_shard(&mut self, value: f64, shard_id: String) { let object = ShardManagerObject { key: value, @@ -37,6 +41,7 @@ impl ShardManager { shards.push(object); } + /// Returns the top shard in the heap. pub fn peek(&self) -> Option { let shards = match self.shards.lock() { Ok(shards) => shards, @@ -52,6 +57,7 @@ impl ShardManager { } } + /// Returns the number of shards in the heap. pub fn count(&self) -> usize { let shards = match self.shards.lock() { Ok(shards) => shards, @@ -73,6 +79,8 @@ impl ShardManager { self.add_shard(memory, shard_id); } + /// Pops the top shard from the heap. + /// If the heap is empty, it will return None. fn pop(&mut self) -> Option { let mut shards = match self.shards.lock() { Ok(shards) => shards, @@ -124,6 +132,8 @@ impl ShardManager { *shards = new_shards; } + /// Saves the max ids for a shard. + /// This function is called when the router receives a message from a shard with the max ids for each table. pub fn save_max_ids_for_shard(&mut self, shard_id: String, tables_id_info: TablesIdInfo) { let shard_max_ids = match self.shard_max_ids.lock() { Ok(shard_max_ids) => shard_max_ids, @@ -136,6 +146,7 @@ impl ShardManager { shard_max_ids.insert(shard_id, tables_id_info); } + /// Returns the names of the tables existing in all shards. pub fn get_table_names_for_all(&self) -> Vec { let shard_max_ids = match self.shard_max_ids.lock() { Ok(shard_max_ids) => shard_max_ids, @@ -155,6 +166,7 @@ impl ShardManager { table_names } + /// Returns the max ids for a shard and table. pub fn get_max_ids_for_shard_table(&self, shard_id: &str, table: &str) -> Option { let shard_max_ids = match self.shard_max_ids.lock() { Ok(shard_max_ids) => shard_max_ids, @@ -180,6 +192,7 @@ struct ShardManagerObject { value: String, } +/// Implementing Ord for ShardManagerObject impl Ord for ShardManagerObject { fn cmp(&self, other: &Self) -> Ordering { match self.partial_cmp(other) { @@ -189,18 +202,21 @@ impl Ord for ShardManagerObject { } } +/// Implementing PartialOrd for ShardManagerObject impl PartialOrd for ShardManagerObject { fn partial_cmp(&self, other: &Self) -> Option { self.key.partial_cmp(&other.key) } } +/// Implementing PartialEq for ShardManagerObject impl PartialEq for ShardManagerObject { fn eq(&self, other: &Self) -> bool { self.key == other.key } } +/// Implementing Eq for ShardManagerObject impl Eq for ShardManagerObject {} #[cfg(test)] diff --git a/sharding/src/node/tables_id_info.rs b/sharding/src/node/tables_id_info.rs index 390a1a5586d49..b65b19d30343a 100644 --- a/sharding/src/node/tables_id_info.rs +++ b/sharding/src/node/tables_id_info.rs @@ -1,8 +1,10 @@ use crate::utils::common::{ConvertToString, FromString}; use indexmap::IndexMap; +/// Type alias for the tables id info, which is a hashmap with the table name as key and the max id available in said table as value. pub type TablesIdInfo = IndexMap; +/// Implementation of ConvertToString for TablesIdInfo impl ConvertToString for TablesIdInfo { fn convert_to_string(&self) -> String { if self.is_empty() { @@ -17,6 +19,7 @@ impl ConvertToString for TablesIdInfo { } } +/// Implementation of FromString for TablesIdInfo impl FromString for TablesIdInfo { fn from_string(string: &str) -> Self { let mut result = IndexMap::new(); From 9a3c219e7e535543a3b8ebf86651058dfe67a45a Mon Sep 17 00:00:00 2001 From: aldoRastrelli Date: Sun, 8 Dec 2024 22:17:45 -0300 Subject: [PATCH 07/13] cargo fmt --- sharding/src/node/client.rs | 5 ++++- sharding/src/node/node.rs | 11 ++++++----- sharding/src/node/router.rs | 20 ++++++++++++++------ sharding/src/node/shard.rs | 11 ++++++----- sharding/src/node/shard_manager.rs | 1 - 5 files changed, 30 insertions(+), 18 deletions(-) diff --git a/sharding/src/node/client.rs b/sharding/src/node/client.rs index 3e4aba7f6371c..749906915be66 100644 --- a/sharding/src/node/client.rs +++ b/sharding/src/node/client.rs @@ -212,7 +212,10 @@ impl NodeRole for Client { /// Sends query to the router and returns the response. fn send_query(&mut self, query: &str) -> Option { if query == "whoami;" { - println!("{color_bright_green}> I am Client: {}:{}{style_reset}\n", self.ip, self.port); + println!( + "{color_bright_green}> I am Client: {}:{}{style_reset}\n", + self.ip, self.port + ); return None; } diff --git a/sharding/src/node/node.rs b/sharding/src/node/node.rs index a0e9128fcf170..15684ca42250d 100644 --- a/sharding/src/node/node.rs +++ b/sharding/src/node/node.rs @@ -4,9 +4,9 @@ use crate::node::client::Client; use crate::utils::node_config::get_nodes_config_raft; use crate::utils::node_config::INIT_HISTORY_FILE_PATH; use crate::utils::queries::print_rows; +use inline_colorization::*; use postgres::Row; use std::ffi::CStr; -use inline_colorization::*; use std::fmt::Error; use std::sync::mpsc::{self, Receiver, Sender}; use std::sync::{Arc, Mutex}; @@ -27,16 +27,17 @@ pub trait NodeRole { /// Returns all tables currently existing in the node's PostgreSQL cluster fn get_all_tables_from_self(&mut self, check_if_empty: bool) -> Vec { - let query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"; + let query = + "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"; let Some(rows) = self.get_rows_for_query(query) else { return Vec::new(); }; - + // If no need to check for emptiness, return all table names if !check_if_empty { return rows.into_iter().map(|row| row.get(0)).collect(); } - + let mut non_empty_tables = Vec::new(); for row in rows { let table_name: String = row.get(0); @@ -45,7 +46,7 @@ pub trait NodeRole { non_empty_tables.push(table_name); } } - + non_empty_tables } diff --git a/sharding/src/node/router.rs b/sharding/src/node/router.rs index 91c181a1a6163..1dfbe00cf27f6 100644 --- a/sharding/src/node/router.rs +++ b/sharding/src/node/router.rs @@ -298,7 +298,7 @@ impl Router { let mut tables = self.get_all_tables_from_shards(); tables.extend(self.get_all_tables_from_self(false)); - // delete duplicated tables + // delete duplicated tables tables.sort(); tables.dedup(); @@ -374,12 +374,18 @@ impl Router { .set_health_connection(node_ip.as_str(), node_port.as_str()) .is_err() { - eprintln!("Could not connect to node: {} at {}:{}", node.name, node_ip, node_port); + eprintln!( + "Could not connect to node: {} at {}:{}", + node.name, node_ip, node_port + ); return; } let Ok(shard_client) = connect_to_node(&node_ip, &node_port) else { - eprintln!("Could not connect to node: {} at {}:{}", node.name, node_ip, node_port); + eprintln!( + "Could not connect to node: {} at {}:{}", + node.name, node_ip, node_port + ); return; }; @@ -626,7 +632,7 @@ impl Router { } } - /// Function that allows the router to get the specific shard that owns a given ID, and format the query with the new ID to be sent to the shard. + /// Function that allows the router to get the specific shard that owns a given ID, and format the query with the new ID to be sent to the shard. /// This is needed because the router abstracts the client from the sharding implementation, joining all the shards' data into a single table with a unique ID, using an offset system to avoid duplicated IDs. So, when a query with a specific ID is received, the router needs to find the specific shard that owns that ID and format the query with the new ID to be sent to the shard. fn get_specific_shard_with(&mut self, mut id: i64, query: &str) -> (Vec, bool, String) { let shards = match self.shards.lock() { @@ -715,7 +721,10 @@ impl NodeRole for Router { fn send_query(&mut self, received_query: &str) -> Option { if received_query == "whoami;" { - println!("{color_bright_green}> I am Router: {}:{}{style_reset}\n", self.ip, self.port); + println!( + "{color_bright_green}> I am Router: {}:{}{style_reset}\n", + self.ip, self.port + ); return None; } @@ -887,7 +896,6 @@ impl NodeRole for Router { // MARK: - Communication with shards impl Router { - /// Gets the stream for the shard with the given shard id. fn get_stream(&self, shard_id: &str) -> Option>> { let Ok(comm_channels) = self.comm_channels.read() else { diff --git a/sharding/src/node/shard.rs b/sharding/src/node/shard.rs index f0e48398137f2..a8fcf84deb4f3 100644 --- a/sharding/src/node/shard.rs +++ b/sharding/src/node/shard.rs @@ -10,12 +10,12 @@ use crate::utils::queries::query_affects_memory_state; use indexmap::IndexMap; use inline_colorization::*; use postgres::Client as PostgresClient; +use std::fmt; use std::io::{Read, Write}; use std::net::{TcpListener, TcpStream}; use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; use std::{io, thread}; -use std::fmt; extern crate users; @@ -79,7 +79,10 @@ impl Shard { let _ = shard.update(); - println!("{color_bright_green}Shard created successfully. Shard: {}, {}:{} {style_reset}", shard.name, ip, port); + println!( + "{color_bright_green}Shard created successfully. Shard: {}, {}:{} {style_reset}", + shard.name, ip, port + ); shard } @@ -116,9 +119,7 @@ impl Shard { let mut candidate_stream = match TcpStream::connect(format!("{}:{}", candidate_ip, candidate_port)) { - Ok(stream) => { - stream - } + Ok(stream) => stream, Err(_) => { continue; } diff --git a/sharding/src/node/shard_manager.rs b/sharding/src/node/shard_manager.rs index 5c9f529b81ea6..6d325ada991e1 100644 --- a/sharding/src/node/shard_manager.rs +++ b/sharding/src/node/shard_manager.rs @@ -16,7 +16,6 @@ pub(crate) struct ShardManager { } impl ShardManager { - /// Creates a new ShardManager. pub fn new() -> Self { ShardManager { From ed8245bd2594961307d4a13b31045d475e810c40 Mon Sep 17 00:00:00 2001 From: aldoRastrelli Date: Sun, 8 Dec 2024 22:29:07 -0300 Subject: [PATCH 08/13] Remove unneeded imports --- sharding/src/node/shard_manager.rs | 1 - sharding/tests/integration_tests.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sharding/src/node/shard_manager.rs b/sharding/src/node/shard_manager.rs index 6d325ada991e1..7e3812d767926 100644 --- a/sharding/src/node/shard_manager.rs +++ b/sharding/src/node/shard_manager.rs @@ -1,5 +1,4 @@ use indexmap::IndexMap; -use inline_colorization::*; use std::{ cmp::Ordering, collections::BinaryHeap, diff --git a/sharding/tests/integration_tests.rs b/sharding/tests/integration_tests.rs index 4559f76c3173e..6c31373886b7d 100644 --- a/sharding/tests/integration_tests.rs +++ b/sharding/tests/integration_tests.rs @@ -2,7 +2,7 @@ use core::panic; use postgres::{Client, NoTls}; -use sharding::node::node::{get_node_role, InitNodeInstance, NodeRole, NodeType}; +use sharding::node::node::NodeRole; use std::{ fs, io::Write, From cf3ff06cff7855df30abd325cf4d42f0c2006041 Mon Sep 17 00:00:00 2001 From: aldoRastrelli Date: Sun, 8 Dec 2024 22:31:34 -0300 Subject: [PATCH 09/13] Cargo clippy fix --- sharding/src/node/client.rs | 4 ++-- sharding/src/node/memory_manager.rs | 2 +- sharding/src/node/messages/message.rs | 9 +++------ sharding/src/node/router.rs | 13 ++++++------- sharding/src/node/send_query_result.rs | 4 ++-- sharding/src/node/shard.rs | 4 ++-- sharding/src/node/shard_manager.rs | 10 ++-------- sharding/src/psql/psql.rs | 5 +---- sharding/src/utils/common.rs | 7 ++----- sharding/src/utils/queries.rs | 14 +++++--------- 10 files changed, 26 insertions(+), 46 deletions(-) diff --git a/sharding/src/node/client.rs b/sharding/src/node/client.rs index 749906915be66..0e6eca4e68a78 100644 --- a/sharding/src/node/client.rs +++ b/sharding/src/node/client.rs @@ -171,7 +171,7 @@ impl Client { println!( "{color_bright_green}Connected to router stream {}:{}{style_reset}", node_ip, - connections_port.to_string() + connections_port ); return Some(router_stream); } @@ -183,7 +183,7 @@ impl Client { /// Handles the received message from a connection. fn handle_received_message(buffer: &mut [u8]) { - let message_string = String::from_utf8_lossy(&buffer); + let message_string = String::from_utf8_lossy(buffer); let response_message = match message::Message::from_string(&message_string) { Ok(message) => message, Err(_) => { diff --git a/sharding/src/node/memory_manager.rs b/sharding/src/node/memory_manager.rs index 6d03037d10a6c..000c0e1548f96 100644 --- a/sharding/src/node/memory_manager.rs +++ b/sharding/src/node/memory_manager.rs @@ -96,7 +96,7 @@ mod tests { let unavailable_memory_perc = 0.0; let available_memory_perc = MemoryManager::get_available_memory_percentage(unavailable_memory_perc); - assert_eq!(available_memory_perc.is_some(), true); + assert!(available_memory_perc.is_some()); } #[test] diff --git a/sharding/src/node/messages/message.rs b/sharding/src/node/messages/message.rs index 67c21661fc7c1..73f812cd49ab0 100644 --- a/sharding/src/node/messages/message.rs +++ b/sharding/src/node/messages/message.rs @@ -270,7 +270,7 @@ impl Message { // Query Data result.push(' '); if let Some(query) = &self.query_data { - result.push_str(&query); + result.push_str(query); } else { result.push_str("None"); } @@ -331,10 +331,7 @@ impl Message { query.push_str(part); query.push(' '); } - match query.split(';').next() { - Some(query) => Some(query.to_string()), - None => None, - } + query.split(';').next().map(|query| query.to_string()) } None => None, }; @@ -601,7 +598,7 @@ mod tests { "INIT_CONNECTION 0.5 departments:5,employees:3 None None\n", ]; - assert!(options.contains(&&message.to_string().as_str())); + assert!(options.contains(&message.to_string().as_str())); } #[test] diff --git a/sharding/src/node/router.rs b/sharding/src/node/router.rs index 1dfbe00cf27f6..36f35d87b33b9 100644 --- a/sharding/src/node/router.rs +++ b/sharding/src/node/router.rs @@ -741,7 +741,7 @@ impl NodeRole for Router { } // If the router suddenly is alone, it's going to have to hold on to the data until the shards are available again. So the tables must be available in its backend too. - if query_is_create_or_drop(&received_query) { + if query_is_create_or_drop(received_query) { _ = self.send_query_to_backend(received_query); } @@ -798,7 +798,7 @@ impl NodeRole for Router { }; rows_lock.extend(shard_response); } - return Ok(()); + Ok(()) }); handles.push(_shard_response_handle); } @@ -990,13 +990,13 @@ impl Router { Err(error) => { eprintln!("Failed to send the query to the shard: "); if is_undefined_table(&error) { - eprint!("Relation (table) does not exist\n"); + eprintln!("Relation (table) does not exist"); return Err((SendQueryError::UndefinedTable, None)); } else if is_connection_closed(&error) { println!("Connection closed with shard {shard_id}"); return Err((SendQueryError::ClientIsClosed, Some(shard_id.to_string()))); } else { - eprint!("{error:?}\n"); + eprintln!("{error:?}"); } return Err((SendQueryError::Other(format!("{error:?}")), None)); } @@ -1144,8 +1144,7 @@ impl Router { let mut columns_definitions: Vec = rows .iter() - .enumerate() - .map(|(_, row)| { + .map(|row| { let column_name: String = row.get("column_name"); // PostgresClient does not support getting the PrimaryKey, so all tables will have a SERIAL PRIMARY KEY called "id". If you want to fix this, be my guest let data_type: String = if column_name == "id" { @@ -1226,7 +1225,7 @@ impl Router { } }; - if rows.len() == 0 { + if rows.is_empty() { continue; } diff --git a/sharding/src/node/send_query_result.rs b/sharding/src/node/send_query_result.rs index 0669c1b475944..4b1fb8d0c44c9 100644 --- a/sharding/src/node/send_query_result.rs +++ b/sharding/src/node/send_query_result.rs @@ -27,7 +27,7 @@ impl std::error::Error for SendQueryError {} /// Checks if the error is a connection closed error pub fn is_connection_closed(err: &Error) -> bool { let err = format!("{}", err); - return err.contains("connection closed") || err.contains("kind: Connection reset by peer"); + err.contains("connection closed") || err.contains("kind: Connection reset by peer") } /// Checks if the error is an undefined table error @@ -35,5 +35,5 @@ pub fn is_undefined_table(err: &Error) -> bool { if let Some(db_error) = err.as_db_error() { return db_error.code().code() == UNDEFINED_TABLE_CODE; } - return false; + false } diff --git a/sharding/src/node/shard.rs b/sharding/src/node/shard.rs index a8fcf84deb4f3..6b1708887059f 100644 --- a/sharding/src/node/shard.rs +++ b/sharding/src/node/shard.rs @@ -452,10 +452,10 @@ impl Shard { Ok(mut memory_manager) => memory_manager.update(), Err(_) => { eprintln!("Failed to get memory manager"); - return Err(io::Error::new( + Err(io::Error::new( io::ErrorKind::Other, "Failed to get memory manager", - )); + )) } } } diff --git a/sharding/src/node/shard_manager.rs b/sharding/src/node/shard_manager.rs index 7e3812d767926..506e7a1ffbaf6 100644 --- a/sharding/src/node/shard_manager.rs +++ b/sharding/src/node/shard_manager.rs @@ -49,10 +49,7 @@ impl ShardManager { } }; - match shards.peek() { - Some(object) => Some(object.value.clone()), - None => None, - } + shards.peek().map(|object| object.value.clone()) } /// Returns the number of shards in the heap. @@ -175,10 +172,7 @@ impl ShardManager { }; match shard_max_ids.get(shard_id) { - Some(tables_id_info) => match tables_id_info.get(table) { - Some(max_id) => Some(*max_id), - None => None, - }, + Some(tables_id_info) => tables_id_info.get(table).copied(), None => None, } } diff --git a/sharding/src/psql/psql.rs b/sharding/src/psql/psql.rs index e894f602e9454..9a199c92cc6a8 100644 --- a/sharding/src/psql/psql.rs +++ b/sharding/src/psql/psql.rs @@ -25,8 +25,5 @@ pub extern "C" fn SendQueryToShard(query_data: *const i8) -> bool { fn handle_query(query: &str) -> bool { let node_instance = get_node_role(); - match node_instance.send_query(query) { - Some(_) => true, - None => false, - } + node_instance.send_query(query).is_some() } diff --git a/sharding/src/utils/common.rs b/sharding/src/utils/common.rs index 65166f67469c0..ca9ec3567b9c6 100644 --- a/sharding/src/utils/common.rs +++ b/sharding/src/utils/common.rs @@ -16,17 +16,14 @@ pub fn get_username_dynamically() -> String { pub fn connect_to_node(ip: &str, port: &str) -> Result { let username = get_username_dynamically(); - match PostgresClient::connect( + PostgresClient::connect( format!( "host={} port={} user={} dbname=template1", ip, port, username ) .as_str(), NoTls, - ) { - Ok(shard_client) => Ok(shard_client), - Err(e) => Err(e), - } + ) } #[derive(Clone)] diff --git a/sharding/src/utils/queries.rs b/sharding/src/utils/queries.rs index cdf9275f33779..1cf43d75a45b4 100644 --- a/sharding/src/utils/queries.rs +++ b/sharding/src/utils/queries.rs @@ -88,17 +88,13 @@ fn get_id_index(query: &str) -> Option { let index3 = query_aux.find(query_substring3); let index4 = query_aux.find(query_substring4); - return if let Some(index1) = index1 { + if let Some(index1) = index1 { Some(index1 + offset1) } else if let Some(index2) = index2 { Some(index2 + offset2) } else if let Some(index3) = index3 { Some(index3 + offset3) - } else if let Some(index4) = index4 { - Some(index4 + offset4) - } else { - None - }; + } else { index4.map(|index4| index4 + offset4) } } fn get_trimmed_id(query: &str, from: usize) -> String { @@ -182,11 +178,11 @@ impl ConvertToStringOffset for Row { } for (i, _) in self.columns().iter().enumerate() { - let is_id = self.columns()[i].name().to_string() == "id"; + let is_id = self.columns()[i].name() == "id"; // Try to get the value as a String, If it fails, try to get it as an i32. Same for f64 and Decimal let formatted_value = match self.try_get::(i) { - Ok(v) => format!("{}", v), + Ok(v) => v.to_string(), Err(_) => match self.try_get::(i) { Ok(v) => format!("{}", v), Err(_) => match self.try_get::(i) { @@ -245,7 +241,7 @@ impl ConvertToString for Vec { fn get_column_names(columns: &[Column]) -> String { let mut result = String::new(); for column in columns { - result.push_str(&column.name().to_string()); + result.push_str(column.name()); result.push_str(" | "); } result From ae78c695eac3da5c52d32bc099f5fa2864bb9d3e Mon Sep 17 00:00:00 2001 From: aldoRastrelli Date: Wed, 11 Dec 2024 21:30:59 -0300 Subject: [PATCH 10/13] Fix redistribute_data --- sharding/src/node/router.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sharding/src/node/router.rs b/sharding/src/node/router.rs index 36f35d87b33b9..3ca72f667fd7c 100644 --- a/sharding/src/node/router.rs +++ b/sharding/src/node/router.rs @@ -286,10 +286,6 @@ impl Router { name: node_info.name, }); - self.duplicate_tables_into(&node_info.port); - if self.shard_manager.count() == 1 { - self.redistribute_data(); - } Some("OK".to_string()) } From 5a1d72180aecda39a96f440ccef82fea8dd73fd0 Mon Sep 17 00:00:00 2001 From: aldoRastrelli Date: Tue, 17 Dec 2024 21:38:08 -0300 Subject: [PATCH 11/13] Remove comments --- sharding/src/node/node.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/sharding/src/node/node.rs b/sharding/src/node/node.rs index 15684ca42250d..6ef65cb749977 100644 --- a/sharding/src/node/node.rs +++ b/sharding/src/node/node.rs @@ -338,8 +338,6 @@ fn new_node_instance(node_type: NodeType, ip: &str, port: &str) { /// Initializes a new router fn init_router(ip: &str, port: &str) { - // sleep for 5 seconds to allow the stream to be ready to read - //thread::sleep(std::time::Duration::from_secs(5)); let router = match Router::new(ip, port) { Some(router) => router, @@ -362,7 +360,6 @@ fn init_router(ip: &str, port: &str) { let port_clone = port.to_string(); let _handle = thread::spawn(move || { Router::wait_for_incoming_connections(&shared_router, ip_clone, port_clone); - println!("Router comes back from wait_for_incoming_connections"); }); } From 523db0be8f14ae874b8fc1c1a555c17815829f88 Mon Sep 17 00:00:00 2001 From: aldoRastrelli Date: Tue, 17 Dec 2024 21:40:00 -0300 Subject: [PATCH 12/13] Update try_duplicating_tables --- sharding/src/node/router.rs | 55 +++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/sharding/src/node/router.rs b/sharding/src/node/router.rs index 3ca72f667fd7c..fed6ca10c6220 100644 --- a/sharding/src/node/router.rs +++ b/sharding/src/node/router.rs @@ -286,11 +286,12 @@ impl Router { name: node_info.name, }); + self.try_duplicate_tables_into(&node_info.port); Some("OK".to_string()) } /// Duplicates the existing tables in the existing nodes into the brand new shard. - fn duplicate_tables_into(&mut self, shard_id: &str) { + fn try_duplicate_tables_into(&mut self, shard_id: &str) { let mut tables = self.get_all_tables_from_shards(); tables.extend(self.get_all_tables_from_self(false)); @@ -298,6 +299,8 @@ impl Router { tables.sort(); tables.dedup(); + println!("Tables to be duplicated: {tables:?}"); + for table in tables { let create_query = self.generate_create_table_query(&table, None); if !create_query.is_empty() { @@ -836,7 +839,7 @@ impl NodeRole for Router { } Err(_) => { // The thread panicked - println!("Thread panicked"); + println!("Thread ended"); } }; } @@ -979,7 +982,7 @@ impl Router { } }; - println!("Sending query to shard {shard_id}: {query}"); + println!("{color_bright_green}Sending query to shard {shard_id}: {query}{style_reset}"); if let Some(shard) = shards.get_mut(shard_id) { let rows = match shard.query(query, &[]) { Ok(rows) => rows, @@ -1065,10 +1068,16 @@ impl Router { // drop shard lock drop(shards); - let tables = self.get_all_tables_from_self(true); + let mut tables = self.get_all_tables_from_self(true); + + tables.sort(); + tables.dedup(); + + println!("Tables to be redistributed: {tables:?}"); // Prepare data structures let mut starting_queries = Vec::new(); + let mut insert_queries = Vec::new(); for table in &tables { // Generate CREATE TABLE query @@ -1077,7 +1086,7 @@ impl Router { starting_queries.push(create_query); } - let mut insert_queries = Vec::new(); + // Fetch all rows from the table if let Some(rows) = self.get_rows_for_query(&format!("SELECT * FROM {}", table)) { // Convert each row to an INSERT query and store it @@ -1086,27 +1095,28 @@ impl Router { insert_queries.push(insert_query); } } + } - // Send queries to appropriate shards - for insert_query in &insert_queries { - let (shards, _, formatted_query) = self.get_data_needed_from(insert_query); + // Send starting queries to appropriate shards + for starting_query in &starting_queries { + let (shards, _, formatted_query) = self.get_data_needed_from(starting_query); - for shard_id in shards { - // Send `starting_query` if it hasn't been sent for this table - let table_starting_query = - match starting_queries.iter().find(|q| q.contains(table)) { - Some(q) => q, - None => { - continue; - } - }; - _ = self.send_query_to_shard(&shard_id, table_starting_query, false); + for shard_id in shards { + // Send `starting_query` if it hasn't been sent for this table + _ = self.send_query_to_shard(&shard_id, &formatted_query, false); + } + } - // Send the actual insert query - _ = self.send_query_to_shard(&shard_id, &formatted_query, true); - } + // Send queries to appropriate shards + for insert_query in &insert_queries { + let (shards, _, formatted_query) = self.get_data_needed_from(insert_query); + + for shard_id in shards { + // Send the actual insert query + _ = self.send_query_to_shard(&shard_id, &formatted_query, true); } } + // Drop all tables from router backend self.empty_tables(&tables); } @@ -1203,9 +1213,6 @@ impl Router { column_names.join(", "), result.join(", ") ); - - println!("{color_bright_green}Query: {query:?}{style_reset}"); - query } From d31f8bf4385444a1a372c32a67a8e12f86f8c066 Mon Sep 17 00:00:00 2001 From: aldoRastrelli Date: Tue, 17 Dec 2024 22:13:11 -0300 Subject: [PATCH 13/13] Fix warnings --- sharding/src/node/memory_manager.rs | 68 ++++++++++++----------------- sharding/src/node/shard_manager.rs | 13 ------ 2 files changed, 29 insertions(+), 52 deletions(-) diff --git a/sharding/src/node/memory_manager.rs b/sharding/src/node/memory_manager.rs index 000c0e1548f96..bd75f279dcaad 100644 --- a/sharding/src/node/memory_manager.rs +++ b/sharding/src/node/memory_manager.rs @@ -1,7 +1,5 @@ -use libc::statvfs; -use std::ffi::CString; use std::io; -use sysinfo::System; +use sysinfo::Disks; /// This struct represents the Memory Manager in the distributed system. /// It will manage the memory of the node and will be used to determine if the node should accept new requests. @@ -46,43 +44,35 @@ impl MemoryManager { return Some(0.0); } - // Create a System object - let mut sys = System::new_all(); - - // Refresh system data - sys.refresh_all(); - - // Get the root directory information - let path = match CString::new("/") { - Ok(path) => path, - Err(_) => return None, - }; - let mut stat: statvfs = unsafe { std::mem::zeroed() }; - - if unsafe { statvfs(path.as_ptr(), &mut stat) } == 0 { - let total_space = ((stat.f_blocks as u64) * stat.f_frsize) / 1024; - let available_space = ((stat.f_bavail as u64) * stat.f_frsize) / 1024; - - // if percentage is greater than 1, it means that the total of space used exceeds the threshold. - // If so, return 0 - let total = total_space as f64; - let threshold_size = total * (unavailable_memory_perc / 100.0); - - if threshold_size > available_space as f64 { - return Some(0.0); - } - - let usable_available_space = available_space as f64 - threshold_size; - let usable_total_space = total - threshold_size / 100.0; - - let percentage = usable_available_space / usable_total_space * 100.0; - if percentage > 100.0 { - return Some(0.0); - } - Some(percentage) - } else { - None + // Create a Disk object + let disks = Disks::new_with_refreshed_list(); + + let mut total_space: u64 = 0; + let mut available_space: u64 = 0; + + // Get the total and available space of the disk + for disk in &disks { + total_space += disk.total_space(); + available_space += disk.available_space(); + } + + // if percentage is greater than 1, it means that the total of space used exceeds the threshold. + // If so, return 0 + let total = total_space as f64; + let threshold_size = total * (unavailable_memory_perc / 100.0); + + if threshold_size > available_space as f64 { + return Some(0.0); + } + + let usable_available_space = available_space as f64 - threshold_size; + let usable_total_space = total - threshold_size / 100.0; + + let percentage = usable_available_space / usable_total_space * 100.0; + if percentage > 100.0 { + return Some(0.0); } + Some(percentage) } } diff --git a/sharding/src/node/shard_manager.rs b/sharding/src/node/shard_manager.rs index 506e7a1ffbaf6..b582765ab6fb1 100644 --- a/sharding/src/node/shard_manager.rs +++ b/sharding/src/node/shard_manager.rs @@ -52,19 +52,6 @@ impl ShardManager { shards.peek().map(|object| object.value.clone()) } - /// Returns the number of shards in the heap. - pub fn count(&self) -> usize { - let shards = match self.shards.lock() { - Ok(shards) => shards, - Err(_) => { - eprintln!("Failed to lock shards"); - return 0; - } - }; - - shards.len() - } - /// Updates the memory of a shard and reorders the shards based on the new memory. /// If the memory is higher than the current top shard, it will become the new top shard. /// If the memory is lower than the current top shard, it will be placed in the correct position in the heap.