diff --git a/.gitignore b/.gitignore index 51247532..9b8f1dc2 100644 --- a/.gitignore +++ b/.gitignore @@ -93,6 +93,9 @@ target/ profile_default/ ipython_config.py +# Node +/src/node_modules/ + # pyenv # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: diff --git a/src/algos/MetaL2C.py b/src/algos/MetaL2C.py index beb267de..f0e0edf5 100644 --- a/src/algos/MetaL2C.py +++ b/src/algos/MetaL2C.py @@ -122,7 +122,7 @@ def __init__( ) -> None: super().__init__(config, comm_utils) - self.encoder = ModelEncoder(self.get_model_weights()) + self.encoder = ModelEncoder(self.get_model_weights(get_external_repr=False)) self.encoder_optim = optim.SGD( self.encoder.parameters(), lr=self.config["alpha_lr"] ) diff --git a/src/algos/base_class.py b/src/algos/base_class.py index 899fcd47..f6b24462 100644 --- a/src/algos/base_class.py +++ b/src/algos/base_class.py @@ -29,6 +29,12 @@ TransformDataset, CorruptDataset, ) + +# import the possible attacks +from algos.attack_add_noise import AddNoiseAttack +from algos.attack_bad_weights import BadWeightsAttack +from algos.attack_sign_flip import SignFlipAttack + from utils.log_utils import LogUtils from utils.model_utils import ModelUtils from utils.community_utils import ( @@ -94,6 +100,7 @@ class BaseNode(ABC): def __init__( self, config: Dict[str, Any], comm_utils: CommunicationManager ) -> None: + self.config = config self.set_constants() self.config = config self.comm_utils = comm_utils @@ -123,6 +130,7 @@ def __init__( if "gia" in config and self.node_id in config["gia_attackers"]: self.gia_attacker = True + self.malicious_type = config.get("malicious_type", "normal") self.log_memory = config.get("log_memory", False) @@ -170,7 +178,7 @@ def setup_logging(self, config: ConfigType) -> None: def setup_cuda(self, config: ConfigType) -> None: """add docstring here""" # Need a mapping from rank to device id - if (config.get("assign_based_on_host", False)): + if (config.get("assign_based_on_host", False)) == False: device_ids_map = config["device_ids"] node_name = f"node_{self.node_id}" self.device_ids = device_ids_map[node_name] @@ -266,7 +274,7 @@ def set_shared_exp_parameters(self, config: ConfigType) -> None: def local_round_done(self) -> None: self.round += 1 - def get_model_weights(self, chop_model:bool=False) -> Dict[str, int|Dict[str, Any]]: + def get_model_weights(self, chop_model:bool=False, get_external_repr:bool=True) -> Dict[str, int|Dict[str, Any]]: """ Share the model weights params: @@ -275,6 +283,9 @@ def get_model_weights(self, chop_model:bool=False) -> Dict[str, int|Dict[str, An if chop_model: model, _ = self.model_utils.get_split_model(self.model, self.config["split_layer"]) model = model.state_dict() + elif get_external_repr and self.malicious_type != "normal": + # Get the external representation of the malicious model + model = self.get_malicious_model_weights() else: model = self.model.state_dict() message: Dict[str, int|Dict[str, Any]] = {"sender": self.node_id, "round": self.round, "model": model} @@ -290,6 +301,20 @@ def get_model_weights(self, chop_model:bool=False) -> Dict[str, int|Dict[str, An message["model"][key] = message["model"][key].to("cpu") return message + + def get_malicious_model_weights(self) -> Dict[str, Tensor]: + """ + Get the external representation of the model based on the malicious type. + """ + if self.malicious_type == "sign_flip": + return SignFlipAttack(self.config, self.model.state_dict()).get_representation() + elif self.malicious_type == "bad_weights": + # print("bad weights attack") + return BadWeightsAttack(self.config, self.model.state_dict()).get_representation() + elif self.malicious_type == "add_noise": + return AddNoiseAttack(self.config, self.model.state_dict()).get_representation() + else: + return self.model.state_dict() def get_local_rounds(self) -> int: return self.round @@ -1104,7 +1129,7 @@ def receive_and_aggregate_streaming(self, neighbors: List[int]) -> None: total_weight = 0.0 # To re-normalize weights after handling dropouts # Include the current node's model in the aggregation - current_model_wts = self.get_model_weights() + current_model_wts = self.get_model_weights(get_external_repr=False) # internal model representation assert "model" in current_model_wts, "Model not found in the current model." current_model_wts = current_model_wts["model"] current_weight = 1.0 / (len(neighbors) + 1) # Weight for the current node diff --git a/src/algos/fl.py b/src/algos/fl.py index a8211447..c6978ba0 100644 --- a/src/algos/fl.py +++ b/src/algos/fl.py @@ -37,52 +37,52 @@ def local_test(self, **kwargs: Any) -> Tuple[float, float, float]: return test_loss, test_acc, time_taken - def get_model_weights(self, **kwargs: Any) -> Dict[str, Any]: - """ - Overwrite the get_model_weights method of the BaseNode - to add malicious attacks - TODO: this should be moved to BaseClient - """ - - message = {"sender": self.node_id, "round": self.round} - - malicious_type = self.config.get("malicious_type", "normal") - - if malicious_type == "normal": - message["model"] = self.model.state_dict() # type: ignore - elif malicious_type == "bad_weights": - # Corrupt the weights - message["model"] = BadWeightsAttack( - self.config, self.model.state_dict() - ).get_representation() - elif malicious_type == "sign_flip": - # Flip the sign of the weights, also TODO: consider label flipping - message["model"] = SignFlipAttack( - self.config, self.model.state_dict() - ).get_representation() - elif malicious_type == "add_noise": - # Add noise to the weights - message["model"] = AddNoiseAttack( - self.config, self.model.state_dict() - ).get_representation() - else: - message["model"] = self.model.state_dict() # type: ignore - - # move the model to cpu before sending - for key in message["model"].keys(): - message["model"][key] = message["model"][key].to("cpu") - - # assert hasattr(self, 'images') and hasattr(self, 'labels'), "Images and labels not found" - if "gia" in self.config and hasattr(self, 'images') and hasattr(self, 'labels'): - # also stream image and labels - message["images"] = self.images.to("cpu") - message["labels"] = self.labels.to("cpu") - - message["random_params"] = self.random_params - for key in message["random_params"].keys(): - message["random_params"][key] = message["random_params"][key].to("cpu") + # def get_model_weights(self, **kwargs: Any) -> Dict[str, Any]: + # """ + # Overwrite the get_model_weights method of the BaseNode + # to add malicious attacks + # TODO: this should be moved to BaseClient + # """ + + # message = {"sender": self.node_id, "round": self.round} + + # malicious_type = self.config.get("malicious_type", "normal") + + # if malicious_type == "normal": + # message["model"] = self.model.state_dict() # type: ignore + # elif malicious_type == "bad_weights": + # # Corrupt the weights + # message["model"] = BadWeightsAttack( + # self.config, self.model.state_dict() + # ).get_representation() + # elif malicious_type == "sign_flip": + # # Flip the sign of the weights, also TODO: consider label flipping + # message["model"] = SignFlipAttack( + # self.config, self.model.state_dict() + # ).get_representation() + # elif malicious_type == "add_noise": + # # Add noise to the weights + # message["model"] = AddNoiseAttack( + # self.config, self.model.state_dict() + # ).get_representation() + # else: + # message["model"] = self.model.state_dict() # type: ignore + + # # move the model to cpu before sending + # for key in message["model"].keys(): + # message["model"][key] = message["model"][key].to("cpu") + + # # assert hasattr(self, 'images') and hasattr(self, 'labels'), "Images and labels not found" + # if "gia" in self.config and hasattr(self, 'images') and hasattr(self, 'labels'): + # # also stream image and labels + # message["images"] = self.images.to("cpu") + # message["labels"] = self.labels.to("cpu") + + # message["random_params"] = self.random_params + # for key in message["random_params"].keys(): + # message["random_params"][key] = message["random_params"][key].to("cpu") - return message # type: ignore + # return message # type: ignore def run_protocol(self): print(f"Client {self.node_id} ready to start training") diff --git a/src/configs/algo_config.py b/src/configs/algo_config.py index b84ce565..3f5e6588 100644 --- a/src/configs/algo_config.py +++ b/src/configs/algo_config.py @@ -204,13 +204,12 @@ def get_malicious_types(malicious_config_list: List[ConfigType]) -> Dict[str, st # Collaboration setup "algo": "fedstatic", "topology": {"name": "watts_strogatz", "k": 3, "p": 0.2}, # type: ignore - # "topology": {"name": "base_graph", "max_degree": 2}, # type: ignore - "rounds": 3, + "rounds": 200, # Model parameters "optimizer": "sgd", # TODO comment out for real training "model": "resnet10", - "model_lr": 3e-4, - "batch_size": 256, + "model_lr": 0.1, # 3e-4, + "batch_size": 64, } swift: ConfigType = { @@ -231,11 +230,12 @@ def get_malicious_types(malicious_config_list: List[ConfigType]) -> Dict[str, st # comparison describes the metric or algorithm used to compare the weights of the models # sampling describes the method used to sample the neighbors after the comparison "topology": {"comparison": "weights_l2", "sampling": "closest"}, # type: ignore - "rounds": 20, + "rounds": 200, # Model parameters + "optimizer": "sgd", "model": "resnet10", - "model_lr": 3e-4, + "model_lr": 0.1, "batch_size": 256, } diff --git a/src/configs/algo_config_test.py b/src/configs/algo_config_test.py index bce2b882..48d26cce 100644 --- a/src/configs/algo_config_test.py +++ b/src/configs/algo_config_test.py @@ -3,12 +3,14 @@ fedstatic: ConfigType = { # Collaboration setup "algo": "fedstatic", - "topology": {"name": "watts_strogatz", "k": 3, "p": 0.2}, # type: ignore - "rounds": 1, + # "topology": {"name": "watts_strogatz", "k": 3, "p": 0.2}, # type: ignore + "topology": {"name": "ring"}, + "rounds": 200, # Model parameters + "optimizer": "sgd", "model": "resnet10", - "model_lr": 3e-4, + "model_lr": 0.1, "batch_size": 256, } diff --git a/src/configs/malicious_config.py b/src/configs/malicious_config.py index 15025d42..ebaa07bd 100644 --- a/src/configs/malicious_config.py +++ b/src/configs/malicious_config.py @@ -1,6 +1,7 @@ # Malicious Configuration from utils.types import ConfigType from typing import Dict +import random # Weight Update Attacks sign_flip: ConfigType = { @@ -49,7 +50,7 @@ label_flip: ConfigType = { "malicious_type": "label_flip", "permute_labels": 10, - # "permutation": random.shuffle([i for i in range(10)]), + "permutation": random.sample(range(10), 10) # Generates a random permutation of labels 0-9 } # List of Malicious node configurations @@ -60,4 +61,5 @@ "gradient_attack": gradient_attack, "backdoor_attack": backdoor_attack, "data_poisoning": data_poisoning, + "label_flip": label_flip, } diff --git a/src/configs/sys_config.py b/src/configs/sys_config.py index 60cc7104..a8907ef9 100644 --- a/src/configs/sys_config.py +++ b/src/configs/sys_config.py @@ -159,8 +159,8 @@ def get_digit_five_support(num_users: int, domains: List[str] = DIGIT_FIVE): CIFAR10_DSET = "cifar10" CIAR10_DPATH = "./datasets/imgs/cifar10/" -NUM_COLLABORATORS = 3 -DUMP_DIR = "/tmp/new_sonar/" +NUM_COLLABORATORS = 1 +DUMP_DIR = "/mas/camera/Experiments/SONAR/jyuan/_tmp/" num_users = 9 mpi_system_config: ConfigType = { @@ -327,6 +327,7 @@ def get_digit_five_support(num_users: int, domains: List[str] = DIGIT_FIVE): "dropout_correlation": 0.0, # correlation between dropouts of successive rounds: [0,1] } +dropout_dict = {} #empty dict to disable dropout dropout_dicts: Any = {"node_0": {}} for i in range(1, num_users + 1): dropout_dicts[f"node_{i}"] = dropout_dict @@ -346,14 +347,23 @@ def get_digit_five_support(num_users: int, domains: List[str] = DIGIT_FIVE): "device_ids": get_device_ids(num_users, gpu_ids), "assign_based_on_host": True, # "algos": get_algo_configs(num_users=num_users, algo_configs=default_config_list), # type: ignore - "algos": get_algo_configs(num_users=num_users, algo_configs=[fed_dynamic_weights]), # type: ignore - "samples_per_user": 500, # distributed equally + "algos": get_algo_configs(num_users=num_users, algo_configs=[fedstatic]), # type: ignore + "samples_per_user": 50000 // num_users, # distributed equally "train_label_distribution": "non_iid", "alpha_data": 0.1, "test_label_distribution": "iid", "exp_keys": [], "dropout_dicts": dropout_dicts, - "log_memory": False, + "test_samples_per_user": 200, + "log_memory": True, + "streaming_aggregation": True, # Make it true for fedstatic + # "assign_based_on_host": True, + # "hostname_to_device_ids": { + # "matlaber1": [2, 3, 4, 5, 6, 7], + # "matlaber12": [0, 1, 2, 3], + # "matlaber3": [0, 1, 2, 3], + # "matlaber4": [0, 2, 3, 4, 5, 6, 7], + # } } grpc_system_config_gia: ConfigType = { diff --git a/src/configs/sys_config_test.py b/src/configs/sys_config_test.py index 6e692146..fb927d1f 100644 --- a/src/configs/sys_config_test.py +++ b/src/configs/sys_config_test.py @@ -81,10 +81,10 @@ def get_algo_configs( CIFAR10_DSET = "cifar10" CIAR10_DPATH = "./datasets/imgs/cifar10/" -DUMP_DIR = "/tmp/" +DUMP_DIR = "/mas/camera/Experiments/SONAR/jyuan/_tmp/" -NUM_COLLABORATORS = 1 -num_users = 4 +NUM_COLLABORATORS = 36 +num_users = 36 dropout_dict = { "distribution_dict": { # leave dict empty to disable dropout @@ -101,11 +101,15 @@ def get_algo_configs( gpu_ids = [2, 3, 5, 6] +topo = "torus" +algo_name = "no_malicious" +num_collaborators = NUM_COLLABORATORS + grpc_system_config: ConfigType = { - "exp_id": "static", + "exp_id": f"topo_{topo}x{algo_name}_{0}_malicious_{num_collaborators}_colab_3_4", "num_users": num_users, "num_collaborators": NUM_COLLABORATORS, - "comm": {"type": "GRPC", "synchronous": True, "peer_ids": ["localhost:50048"]}, # The super-node + "comm": {"type": "GRPC", "synchronous": True, "peer_ids": ["matlaber1.media.mit.edu:1112"]}, # The super-node "dset": CIFAR10_DSET, "dump_dir": DUMP_DIR, "dpath": CIAR10_DPATH, @@ -113,8 +117,7 @@ def get_algo_configs( "device_ids": get_device_ids(num_users, gpu_ids), # "algos": get_algo_configs(num_users=num_users, algo_configs=default_config_list), # type: ignore "algos": get_algo_configs(num_users=num_users, algo_configs=[fedstatic]), # type: ignore - # "samples_per_user": 50000 // num_users, # distributed equally - "samples_per_user": 100, + "samples_per_user": 50000 // num_users, # distributed equally "train_label_distribution": "non_iid", "test_label_distribution": "iid", "alpha_data": 1.0, @@ -122,13 +125,12 @@ def get_algo_configs( "dropout_dicts": dropout_dicts, "test_samples_per_user": 200, "log_memory": True, - # "streaming_aggregation": True, # Make it true for fedstatic + "streaming_aggregation": True, # Make it true for fedstatic "assign_based_on_host": True, "hostname_to_device_ids": { - "matlaber1": [2, 3, 4, 5, 6, 7], - "matlaber12": [0, 1, 2, 3], - "matlaber3": [0, 1, 2, 3], - "matlaber4": [0, 2, 3, 4, 5, 6, 7], + "matlaber1": [2, 3, 4, 6, 7], + "matlaber5": [1, 2], + "matlaber12": [2, 3], } } current_config = grpc_system_config \ No newline at end of file diff --git a/src/main.py b/src/main.py index fe4aaa7b..2961d616 100644 --- a/src/main.py +++ b/src/main.py @@ -12,8 +12,8 @@ logging.basicConfig(level=logging.DEBUG) # Enable detailed logging # Default config file paths -B_DEFAULT: str = "./configs/algo_config.py" -S_DEFAULT: str = "./configs/sys_config.py" +B_DEFAULT: str = "./configs/algo_config_test.py" +S_DEFAULT: str = "./configs/sys_config_test.py" # Parse args parser : argparse.ArgumentParser = argparse.ArgumentParser(description="Run collaborative learning experiments") diff --git a/src/utils/communication/grpc/main.py b/src/utils/communication/grpc/main.py index 770f37eb..092ab821 100644 --- a/src/utils/communication/grpc/main.py +++ b/src/utils/communication/grpc/main.py @@ -242,7 +242,7 @@ def recv_with_retries(self, host: str, callback: Callable[[comm_pb2_grpc.Communi ('grpc.max_receive_message_length', MAX_MESSAGE_LENGTH), ]) as channel: stub = comm_pb2_grpc.CommunicationServerStub(channel) - max_tries = 10 + max_tries = 1000000 while max_tries > 0: try: result = callback(stub) @@ -365,7 +365,7 @@ def get_host_from_rank(self, rank: int) -> str: def send_with_retries(self, dest_host: str, buffer: Any) -> Any: with grpc.insecure_channel(dest_host) as channel: # type: ignore stub = comm_pb2_grpc.CommunicationServerStub(channel) # type: ignore - max_tries = 10 + max_tries = 1000000 while max_tries > 0: try: model = comm_pb2.Model(buffer=buffer) # type: ignore diff --git a/src/utils/log_utils.py b/src/utils/log_utils.py index 51a88681..564e7d63 100644 --- a/src/utils/log_utils.py +++ b/src/utils/log_utils.py @@ -83,6 +83,7 @@ def copy_source_code(config: ConfigType) -> None: "./toy_exp_ml/", "./toy_exp.py", "./toy_exp_ml.py", + "./node_modules/", "/".join(path.split("/")[:-1]) + "/", ] folders = glob(r"./*/") diff --git a/src/utils/model_utils.py b/src/utils/model_utils.py index 41f4146d..a537f403 100644 --- a/src/utils/model_utils.py +++ b/src/utils/model_utils.py @@ -88,7 +88,7 @@ def train( model, optim, dloader, loss_fn, device, test_loader=None, **kwargs ) return mean_loss, acc - elif self.malicious_type == "backdoor_attack" or self.malicious_type == "gradient_attack": + elif self.malicious_type == "backdoor_attack" or self.malicious_type == "gradient_attack" or self.malicious_type == "label_flip": train_loss, acc = self.train_classification_malicious( model, optim, dloader, loss_fn, device, test_loader=None, **kwargs ) @@ -308,9 +308,11 @@ def train_classification_malicious( loss.backward() elif self.malicious_type == "label_flip": # permutation = torch.tensor(self.config.get("permutation", [i for i in range(10)])) - permute_labels = self.config.get("permute_labels", 10) - permutation = torch.randperm(permute_labels) - permutation = permutation.to(target.device) + print("flipping labels") + # permute_labels = self.config.get("permute_labels", 10) + # permutation = torch.randperm(permute_labels) + permutation = self.config.get("permutation") + permutation = torch.tensor(permutation, dtype=torch.long, device=target.device) target = permutation[target] # flipped targets loss = loss_fn(output, target) diff --git a/src/utils/plot_combined_exp.py b/src/utils/plot_combined_exp.py index 6b0fd2a0..ff9830d6 100644 --- a/src/utils/plot_combined_exp.py +++ b/src/utils/plot_combined_exp.py @@ -40,7 +40,9 @@ def combine_and_plot( # Load the aggregated metric DataFrame for the experiment metric_df = pd.read_csv(os.path.join(experiment_path, f"{metric_name}_avg.csv")) # TODO: modify this to be CI - # std_df = pd.read_csv(os.path.join(experiment_path, f"{metric_name}_std.csv")) + std_df = pd.read_csv(os.path.join(experiment_path, f"{metric_name}_std.csv")) + n = 36 + # ci_df = 1.96 * (std_df / (n ** 0.5)) ci_df = pd.read_csv(os.path.join(experiment_path, f"{metric_name}_ci95.csv")) # Assuming rounds or time steps are the index @@ -96,18 +98,38 @@ def combine_and_plot( if __name__ == "__main__": # /mas/camera/Experiments/SONAR/jyuan/4_attack_scaling/cifar10_40users_1250_bad_weights_0_malicious_seed1/logs/plots/test_acc_avg.csv - base_dir = "/mas/camera/Experiments/SONAR/jyuan/4_attack_scaling" - exp_names = ["bad_weights", "data_poisoning", "gradient_attack", "label_flip"] - plot_names = ["Bad Weights Attack", "Data Poisoning Attack", "Gradient Attack", "Label Flip Attack"] - num_malicious = [0, 1, 4, 8, 12] - output_dir = "/mas/camera/Experiments/SONAR/jyuan/4_attack_scaling/plots/" + # base_dir = "/mas/camera/Experiments/SONAR/jyuan/4_attack_scaling" + # exp_names = ["bad_weights", "data_poisoning", "gradient_attack", "label_flip"] + # plot_names = ["Bad Weights Attack", "Data Poisoning Attack", "Gradient Attack", "Label Flip Attack"] + # num_malicious = [0, 1, 4, 8, 12] + # output_dir = "/mas/camera/Experiments/SONAR/jyuan/4_attack_scaling/plots/" + + # for exp_ind, exp_name in enumerate(exp_names): + # experiment_map = {} + # for num_mal in num_malicious: + # experiment_map[f"{num_mal}_malicious"] = os.path.join(base_dir, f"cifar10_40users_1250_{exp_name}_{num_mal}_malicious_seed1/logs/plots/") + # metrics_list = ["test_acc"] + # plot_titles = [f"{plot_names[exp_ind]}: Test Accuracy Over Time"] + # xlabels = ["Rounds"] + # ylabels = ["Accuracy"] + # combine_and_plot(exp_name, experiment_map, metrics_list, plot_titles, xlabels, ylabels, output_dir) + + base_dir = "/mas/camera/Experiments/SONAR/jyuan/8_many_colab/bad_weights/" + exp_names = ["0_mal", "1_mal", "4_mal"] + plot_names = ["No Malicious", "1 Malicious Bad Weights", "4 Malicious Bad Weights"] + topologies = ["ring", "torus", "fully_connected", "erdos_renyi", "el", "one_peer_exponential", "base_graph", "dynamic"] + output_dir = "/mas/camera/Experiments/SONAR/jyuan/8_many_colab/bad_weights" for exp_ind, exp_name in enumerate(exp_names): - experiment_map = {} - for num_mal in num_malicious: - experiment_map[f"{num_mal}_malicious"] = os.path.join(base_dir, f"cifar10_40users_1250_{exp_name}_{num_mal}_malicious_seed1/logs/plots/") - metrics_list = ["test_acc"] - plot_titles = [f"{plot_names[exp_ind]}: Test Accuracy Over Time"] - xlabels = ["Rounds"] - ylabels = ["Accuracy"] - combine_and_plot(exp_name, experiment_map, metrics_list, plot_titles, xlabels, ylabels, output_dir) \ No newline at end of file + for attacks in ["bad_weights"]: + experiment_map = {} + for topo in topologies: + if topo in ["base_graph", "dynamic", "one_peer_exponential", "el"]: + experiment_map[f"{topo}"] = os.path.join(base_dir, f"cifar10_36users_1388_topo_{topo}x{attacks}_{exp_name}icious_3_5_seed2/logs/plots/") + else: + experiment_map[f"{topo}"] = os.path.join(base_dir, f"cifar10_36users_1388_topo_{topo}x{attacks}_{exp_name}icious_36_colab_3_5_seed2/logs/plots/") + metrics_list = ["test_acc"] + plot_titles = [f"{plot_names[exp_ind]}: Test Accuracy Over Time"] + xlabels = ["Rounds"] + ylabels = ["Accuracy"] + combine_and_plot(f"{attacks}_{exp_name}", experiment_map, metrics_list, plot_titles, xlabels, ylabels, output_dir, include_logs=False) \ No newline at end of file diff --git a/src/utils/post_hoc_plot_utils.py b/src/utils/post_hoc_plot_utils.py index b53cbc53..bd2cfc20 100644 --- a/src/utils/post_hoc_plot_utils.py +++ b/src/utils/post_hoc_plot_utils.py @@ -56,11 +56,20 @@ def compute_per_user_metrics(node_id: str, logs_dir: str) -> Dict[str, float]: 'best_train_acc': best_accuracy(train_acc, 'train_acc'), 'best_test_acc': best_accuracy(test_acc, 'test_acc'), 'best_train_loss': best_loss(train_loss, 'train_loss'), - 'best_test_loss': best_loss(test_loss, 'test_loss') + 'best_test_loss': best_loss(test_loss, 'test_loss'), } + # check if bytes_received exists + try: + bytes_received = load_logs(node_id, 'bytes_received', logs_dir) + metrics['avg_bytes_received'] = bytes_received['bytes_received'].mean() + metrics['std_bytes_received'] = bytes_received['bytes_received'].std() + except: + pass + return metrics + def aggregate_metrics_across_users(logs_dir: str, output_dir: Optional[str] = None) -> Tuple[pd.Series, pd.Series, pd.DataFrame]: """Aggregate metrics across all users, categorize nodes, and save the results to CSV files.""" nodes = get_all_nodes(logs_dir) @@ -623,28 +632,29 @@ def aggregate_neighbors_across_users(logs_dir: str) -> np.ndarray: return np.array(all_users_neighbors).T # Use if you a specific experiment folder -# if __name__ == "__main__": -# # Define the path where your experiment logs are saved -# logs_dir = '/mas/camera/Experiments/SONAR/jyuan/experiment/logs_sample_time_elapsed/' -# avg_metrics, std_metrics, df_metrics = aggregate_metrics_across_users(logs_dir) -# plot_all_metrics(logs_dir, per_round=True, per_time=True, plot_avg_only=True) +if __name__ == "__main__": + # Define the path where your experiment logs are saved + logs_dir = '/mas/camera/Experiments/SONAR/jyuan/8_many_colab/cifar10_36users_1388_topo_ringxbad_weights_0_malicious_1_colab_3_5_seed2/logs/' + avg_metrics, std_metrics, df_metrics = aggregate_metrics_across_users(logs_dir) + print("hello world") + plot_all_metrics(logs_dir, per_round=True, per_time=True, plot_avg_only=True) # Use if you want to compute for multiple experiment folders -if __name__ == "__main__": - # Define the base directory where your experiment logs are saved - base_logs_dir = '/mas/camera/Experiments/SONAR/abhi/' - - # Iterate over each subdirectory in the base directory - for experiment_folder in os.listdir(base_logs_dir): - experiment_path = os.path.join(base_logs_dir, experiment_folder) - logs_dir = os.path.join(experiment_path, 'logs') - - if os.path.isdir(logs_dir): - try: - print(f"Processing logs in: {logs_dir}") - avg_metrics, std_metrics, df_metrics = aggregate_metrics_across_users(logs_dir) - plot_all_metrics(logs_dir, per_round=True, per_time=True, plot_avg_only=True) - except Exception as e: - print(f"Error processing {logs_dir}: {e}") - continue \ No newline at end of file +# if __name__ == "__main__": +# # Define the base directory where your experiment logs are saved +# base_logs_dir = '/mas/camera/Experiments/SONAR/jyuan/8_many_colab/' + +# # Iterate over each subdirectory in the base directory +# for experiment_folder in os.listdir(base_logs_dir): +# experiment_path = os.path.join(base_logs_dir, experiment_folder) +# logs_dir = os.path.join(experiment_path, 'logs') + +# if os.path.isdir(logs_dir): +# try: +# print(f"Processing logs in: {logs_dir}") +# avg_metrics, std_metrics, df_metrics = aggregate_metrics_across_users(logs_dir) +# plot_all_metrics(logs_dir, per_round=True, per_time=True, plot_avg_only=True) +# except Exception as e: +# print(f"Error processing {logs_dir}: {e}") +# continue \ No newline at end of file