diff --git a/simulatingrisk/hawkdove/README.md b/simulatingrisk/hawkdove/README.md new file mode 100644 index 0000000..9287125 --- /dev/null +++ b/simulatingrisk/hawkdove/README.md @@ -0,0 +1,106 @@ +# Hawk-Dove with risk attitudes + +Hawk/Dove game with risk attitudes + +## Game description + +This is a variant of the Hawk/Dove Game: https://en.wikipedia.org/wiki/Chicken_(game) + +| | H | D| +|-|-|-| +| H | 0, 0 | 3, 1| +| D |1, 3| 2, 2| + +BACKGROUND: An unpublished paper by Simon Blessenohl shows that the equilibrium in this game is different for EU maximizers than for REU maximizers (all with the same risk-attitude), and that REU maximizers do better as a population (basically, play DOVE more often) + +We want to know: what happens when different people have _different_ risk-attitudes. +(See also variant simulation [Hawk/Dove game with multiple risk attitudes](../hawkdovemulti/). ) + +GAME: Hawk-Dove with risk-attitudes + +Players arranged on a lattice [options for both 4 neighbors (AYBD) and 8 neighbors (XYZABCDE)] + +| | | | +|-|-|-| +| X | Y |Z | +|A | **I** | B | +| C | D | E | + +- Payoffs are determined as follows: + - Look at what each neighbor did, then: + - If I play HAWK and neighbor plays DOVE: 3 + - If I play DOVE and neighbor plays DOVE: 2 + - If I play DOVE and neighbor plays HAWK: 1 + - If I play HAWK and neighbor plays HAWK: 0 + +Each player on a lattice (grid in Mesa): +- Has parameter $r$ [from 0 to 9] +- Let `d` be the number of neighbors who played DOVE during the previous round. If $d >= r$, then play HAWK. Otherwise play DOVE. (Agents who are risk-avoidant only play HAWK if there are a lot of doves around them. More risk-avoidance requires a higher number of doves to get an agent to play HAWK.) +- The proportion of neighbors who play DOVE corresponds to your probability of encountering a DOVE when playing a randomly-selected neighbor. The intended interpretation is that you maximize REU for this probability of your opponent playing DOVE. Thus, $r$ corresponds to the probability above which playing HAWK maximizes REU. + - Choice of play for the first round: + - Who is a HAWK and who is a DOVE is randomly determined; proportion set at the beginning of each simulation. E.g. 30% are HAWKS; if we have 100 players, then each player has a 30% chance of being HAWK; + - This initial parameter is called HAWK-ODDS; default is 50/50 + + +## Payoffs and risk attitudes + +This game has a discrete set of options instead of probability, so instead of defining `r` as a value between 0.0 and 1.0, we use discrete values based on the choices. For the game that includes diagonal neighbors when agents play all neighbors: + + + + + + + + + + + + + + + + + + + + + + +
r0123456789
Plays H when:always$\geq1$ D$\geq2$ D$\geq3$ D$\geq4$ D$\geq5$ D$\geq6$ D$\geq7$ D$\geq8$ Dnever
risk seekingEU maximizer
(risk neutral)
EU maximizer
(risk neutral)
risk avoidant
+ + +An REU maximizer will play HAWK when +```math +r(p) > \frac{(D,H)-(H,H)}{(H,D)-(D,D)} +``` +In other words, when $r(p) > 0.52$. An EU maximizer, with $r(p) = p$, will play HAWK when $p > 0.52$, e.g., when more than 4 out of 8 neighbors play DOVE. Thus, $r = 4$ corresponds to risk-neutrality (EU maximization), $r < 4$ corresponds to risk-inclination, and $r > 4$ corresponds to risk-avoidance. + +Payoffs were chosen to avoid the case in which two choices had equal expected utility for some number of neighbors. For example, if the payoff of $(D,D)$ was $(2,2)$, then at $p = 0.5$ (4 of 8 neighbors), then EU maximizers would be indifferent between HAWK and DOVE; in this case, no r-value would correspond to EU maximization, since $r = 4$ strictly prefers DOVE and $r = 3$ strictly prefers HAWK. + +Another way to visualize the risk attitudes and choices in this game is this table, which shows when agents will play Hawk or Dove based on their risk attitudes (going down on the left side) and the number of neighbors playing Dove (across the top). + + + + + + + + + + + + + + +
# of neighors playing DOVE
r012345678
risk seeking0HHHHHHHHH
1DHHHHHHHH
2DDHHHHHHH
3DDDHHHHHH
neutral4DDDDHHHHH
5DDDDDHHHH
risk avoidant6DDDDDDHHH
7DDDDDDDHH
8DDDDDDDDH
9DDDDDDDDD
+ +## Convergence + +The model is configured to stop automatically when it has stabilized. Convergence is based on a stable rolling average of the percent of agents in the simulation playing hawk. + +A rolling average of the percent of agents playing hawk is calculated every round based on the percent for the last **30** rounds. The rolling average is not calculated until after at least **15** rounds. + +When we have collected the rolling average for at least **15** rounds and the last **30** rolling averages are the same when rounded to 2 percentage points, we consider the simulation converged. + + diff --git a/simulatingrisk/hawkdove/model.py b/simulatingrisk/hawkdove/model.py new file mode 100644 index 0000000..35e80f8 --- /dev/null +++ b/simulatingrisk/hawkdove/model.py @@ -0,0 +1,375 @@ +from enum import Enum +from collections import deque +import math +import statistics + +import mesa + +from simulatingrisk.utils import coinflip + +Play = Enum("Play", ["HAWK", "DOVE"]) +play_choices = [Play.HAWK, Play.DOVE] + + +# divergent color scheme, ten colors +# from https://colorbrewer2.org/?type=diverging&scheme=RdYlGn&n=10 +divergent_colors_10 = [ + "#a50026", + "#d73027", + "#f46d43", + "#fdae61", + "#fee08b", + "#d9ef8b", + "#a6d96a", + "#66bd63", + "#1a9850", + "#006837", +] + +# divergent color scheme, five colors +# from https://colorbrewer2.org/?type=diverging&scheme=RdYlGn&n=5 +divergent_colors_5 = ["#d7191c", "#fdae61", "#ffffbf", "#a6d96a", "#1a9641"] + + +class HawkDoveAgent(mesa.Agent): + """ + An agent with a risk attitude playing Hawk or Dove + """ + + def __init__(self, unique_id, model, hawk_odds=None): + super().__init__(unique_id, model) + + self.points = 0 + self.choice = self.initial_choice(hawk_odds) + self.last_choice = None + + # risk level must be set by base class, since initial + # conditions are specific to single / variable risk games + self.set_risk_level() + + def set_risk_level(self): + raise NotImplementedError + + def __repr__(self): + return ( + f"<{self.__class__.__name__} id={self.unique_id} " + + f"r={self.risk_level} points={self.points}>" + ) + + def initial_choice(self, hawk_odds=None): + # first round : choose what to play randomly or based on initial hawk odds + opts = {} + if hawk_odds is not None: + opts["weight"] = hawk_odds + return coinflip(play_choices, **opts) + + @property + def choice_label(self): + return "hawk" if self.choice == Play.HAWK else "dove" + + def get_neighbors(self, size): + """get all neighbors for a supported neighborhood size""" + check_neighborhood_size(size) + # 4 and 8 neighborhood use default radius 1 + # 8 and 24 both use moore neighborhood (includes diagonals) + opts = {"moore": True} + if size == 4: + # use von neumann neighborhood instead of moore (no diagonal) + opts["moore"] = False + + # for 24 size neighborhood, use radius 2 + if size == 24: + opts["radius"] = 2 + + return self.model.grid.get_neighbors(self.pos, include_center=False, **opts) + + @property + def play_neighbors(self): + """neighbors to play against, based on model play neighborhood size""" + return self.get_neighbors(self.model.play_neighborhood) + + @property + def observed_neighbors(self): + """neighbors to look at when deciding what to play; + based on model observed neighborhood size""" + return self.get_neighbors(self.model.observed_neighborhood) + + @property + def num_dove_neighbors(self): + """count how many neighbors played DOVE on the last round + (uses `observed_neighborhood` size from model)""" + return len([n for n in self.observed_neighbors if n.last_choice == Play.DOVE]) + + @property + def proportional_num_dove_neighbors(self): + """adjust the number of dove neighbors based on ratio between + play neighborhood and observed neighborhood, to scale observations + to the range of agent risk level.""" + ratio = self.model.max_risk_level / self.model.observed_neighborhood + # always round to an integer + return round(ratio * self.num_dove_neighbors) + + def choose(self): + "decide what to play this round" + # first choice is random since we don't have any information + # about neighbors' choices + if self.model.schedule.steps == 0: + return + + # after the first round, choose based on what neighbors did last time + + # choose based on the number of neighbors who played + # dove last round and agent risk level + + # agent with r = 0 should always take the risky choice + # (any risk is acceptable). + # agent with r = max should always take the safe option + # (no risk is acceptable) + if self.proportional_num_dove_neighbors >= self.risk_level: + choice = Play.HAWK + else: + choice = Play.DOVE + + # based on model configuration, should agent play randomly instead? + if self.model.random_play_odds and coinflip( + [True, False], weight=self.model.random_play_odds + ): + # if a random play is selected, flip a coin between hawk and dove + choice = coinflip([Play.HAWK, Play.DOVE]) + + self.choice = choice + + def play(self): + # play against each neighbor and calculate cumulative payoff + payoff = 0 + for n in self.play_neighbors: + payoff += self.payoff(n) + # update total points based on payoff this round + self.points += payoff + + # store this round's choice as previous choice + self.last_choice = self.choice + + def payoff(self, other): + """ + If I play HAWK and neighbor plays DOVE: 3 + If I play DOVE and neighbor plays DOVE: 2 + If I play DOVE and neighbor plays HAWK: 1 + If I play HAWK and neighbor plays HAWK: 0 + """ + if self.choice == Play.HAWK: + if other.choice == Play.DOVE: + return 3 + if other.choice == Play.HAWK: + return 0 + elif self.choice == Play.DOVE: + if other.choice == Play.DOVE: + return 2 + if other.choice == Play.HAWK: + return 1 + + @property + def points_rank(self): + if self.points: + return math.floor(self.points / self.model.max_agent_points * 10) + return 0 + + +class HawkDoveModel(mesa.Model): + """ + Model for hawk/dove game with risk attitudes. + + :param grid_size: number for square grid size (creates n*n agents) + :param play_neighborhood: size of neighborhood each agent plays + against; 4, 8, or 24 (default: 8) + :param observed_neighborhood: size of neighborhood each agent looks + at when choosing what to play; 4, 8, or 24 (default: 8) + :param hawk_odds: odds for playing hawk on the first round (default: 0.5) + :param risk_adjustment: strategy agents should use for adjusting risk; + None (default), adopt, or average + :param adjust_every: when risk adjustment is enabled, adjust every + N rounds (default: 10) + """ + + #: whether the simulation is running + running = True # required for batch run + #: readable status (running/converged) + status = "running" + + #: size of deque/fifo for recent values + rolling_window = 30 + #: minimum size before calculating rolling average + min_window = 15 + #: class to use when initializing agents + agent_class = HawkDoveAgent + #: supported neighborhood sizes + neighborhood_sizes = {4, 8, 24} + #: minimum risk level + min_risk_level = 0 + #: maximum risk level allowed + max_risk_level = 9 + + def __init__( + self, + grid_size, + play_neighborhood=8, + observed_neighborhood=8, + hawk_odds=0.5, + random_play_odds=0.00, + ): + super().__init__() + # assume a fully-populated square grid + self.num_agents = grid_size * grid_size + for nsize in [play_neighborhood, observed_neighborhood]: + check_neighborhood_size(nsize) + + self.play_neighborhood = play_neighborhood + self.observed_neighborhood = observed_neighborhood + + # distribution of first choice (50/50 by default) + self.hawk_odds = hawk_odds + # how often should agents make a random play + self.random_play_odds = random_play_odds + + # create fifos to track recent behavior to detect convergence + self.recent_percent_hawk = deque([], maxlen=self.rolling_window) + self.recent_rolling_percent_hawk = deque([], maxlen=self.rolling_window) + + # initialize a single grid (each square inhabited by a single agent); + # configure the grid to wrap around so everyone has neighbors + self.grid = mesa.space.SingleGrid(grid_size, grid_size, True) + self.schedule = mesa.time.StagedActivation(self, ["choose", "play"]) + + # initialize all agents + agent_opts = self.new_agent_options() + for i in range(self.num_agents): + # add to scheduler and place randomly in an empty spot + agent = self.agent_class(i, self, **agent_opts) + self.schedule.add(agent) + self.grid.move_to_empty(agent) + + self.datacollector = mesa.DataCollector(**self.get_data_collector_options()) + + def get_data_collector_options(self): + # method to return options for data collection, + # so subclasses can modify + return { + "model_reporters": { + "max_agent_points": "max_agent_points", + "percent_hawk": "percent_hawk", + "rolling_percent_hawk": "rolling_percent_hawk", + "status": "status", + # explicitly track total agents, instead of inferring from grid size + "total_agents": "num_agents", + }, + "agent_reporters": { + "risk_level": "risk_level", + "choice": "choice_label", + "points": "points", + }, + } + + def new_agent_options(self): + # generate and return a dictionary with common options + # for initializing all agents + return {"hawk_odds": self.hawk_odds} + + def step(self): + """ + A model step. Used for collecting data and advancing the schedule + """ + self.schedule.step() + # check if simulation has converged and should stop running + if self.converged: + self.status = "converged" + self.running = False + + # collect data after status is updated, so data collected + # for last round will reflect converged status + self.datacollector.collect(self) + + @property + def max_agent_points(self): + # what is the current largest point total of any agent? + return max([a.points for a in self.schedule.agents]) + + @property + def percent_hawk(self): + # what percent of agents chose hawk? + hawks = [a for a in self.schedule.agents if a.choice == Play.HAWK] + phawk = len(hawks) / self.num_agents + # add to recent values + self.recent_percent_hawk.append(phawk) + return phawk + + @property + def rolling_percent_hawk(self): + # make sure we have enough values to check + if len(self.recent_percent_hawk) > self.min_window: + rolling_phawk = statistics.mean(self.recent_percent_hawk) + # add to recent values + self.recent_rolling_percent_hawk.append(rolling_phawk) + return rolling_phawk + + @property + def converged(self): + # check if the simulation is stable and should stop running + # calculating based on rolling percent hawk; when this is stable + # within our rolling window, return true + # - currently checking for single value; + # could allow for a small amount variation if necessary + + # in variable risk with risk adjustment, numbers are not strictly equal + # but do get close and fairly stable; round to two digits before comparing + rounded_set = set([round(x, 2) for x in self.recent_rolling_percent_hawk]) + return ( + len(self.recent_rolling_percent_hawk) > self.min_window + and len(rounded_set) == 1 + ) + + +def check_neighborhood_size(size): + # neighborhood size check, shared by model and agent + if size not in HawkDoveModel.neighborhood_sizes: + raise ValueError( + f"{size} is not a supported neighborhood size; " + + f"must be one of {HawkDoveModel.neighborhood_sizes}" + ) + + +class HawkDoveSingleRiskAgent(HawkDoveAgent): + """ + An agent with a risk attitude playing Hawk or Dove; must be initialized + with a risk level + """ + + def set_risk_level(self): + self.risk_level = self.model.agent_risk_level + + +class HawkDoveSingleRiskModel(HawkDoveModel): + """hawk/dove simulation where all agents have the same risk atttitude. + Adds a required `agent_risk_level` parameter; supports all + parameters in :class:`HawkDoveModel`. + """ + + #: class to use when initializing agents + agent_class = HawkDoveSingleRiskAgent + + risk_attitudes = "single" + + def __init__(self, grid_size, agent_risk_level, *args, **kwargs): + if ( + agent_risk_level > self.max_risk_level + or agent_risk_level < self.min_risk_level + ): + raise ValueError( + f"Agent risk level {agent_risk_level} is out of range; must be between " + + f"{self.min_risk_level} - {self.max_risk_level}" + ) + + # store agent risk level + self.agent_risk_level = agent_risk_level + + # pass through options and initialize base class + super().__init__(grid_size, *args, **kwargs) diff --git a/simulatingrisk/hawkdovemulti/README.md b/simulatingrisk/hawkdovemulti/README.md new file mode 100644 index 0000000..a6bac8f --- /dev/null +++ b/simulatingrisk/hawkdovemulti/README.md @@ -0,0 +1,77 @@ +# Hawk-Dove with multiple risk attitudes + +This is a variation of the [Hawk/Dove game with risk attitudes](../hawkdove/). +This version adds multiple risk attitudes, with options for updating +risk attitudes periodically based on comparing success of neighboring agents. + +The basic mechanics of the game are the same. This model adds options +for agent risk adjustment (none, adopt, average) and period of risk +adjustment (by default, every ten rounds). The payoff used to compare +agents when adjusting risk attitudes can either be recent (since the +last adjustment round) or total points for the whole game. The +adjustment neighborhood, or which neighboring agents are considered +when adjusting risk attitudes, can be configured to 4, 8, or 24. + +Initial risk attitudes are set by the model. Risk distribution can +be configured to use a normal distribution, uniform (random), bimodal, +skewed left, or skewed right. + +Like the base hawk/dove risk attitude game, there is also a +configuration to add some chance of agents playing hawk/dove randomly +instead of choosing based on the rules of the game. + +## Convergence + +The model is configured to stop automatically when it has stabilized. +Convergence is reached when an adjustment round occurs and zero agents +adjust their risk attitude. + +If adjustment is not enabled, convergence logic falls back to the +implementation of the hawk/dove single-risk attitude simulation, which is +based on a stable rolling % average of agents playing hawk. + +Model and agent data collection also includes reports on whether agents +updated their risk level in the last adjustment round, and model data collection +includes a status of "running" or "converged". + +## Batch running + +This module includes a custom batch run script to run the simulation and +collect data across a large combination of parameters and generate data +files with collected model and agent data. + +To run the script locally from the root project directory: +```sh +simulatingrisk/hawkdovemulti/batch_run.py +``` +Use `-h` or `--help` to see options. + +If this project has been installed with pip or similar, the script is +available as `simrisk-hawkdovemulti-batchrun`. + +To run the batch run script on an HPC cluster: + +- Create a conda environment and install dependencies and this project. + (Major mesa dependencies available with conda are installed first as + conda packages) + +```sh +module load anaconda3/2023.9 +conda create --name simrisk pandas networkx matplotlib numpy tqdm click +conda activate simrisk +pip install git+https://github.com/Princeton-CDH/simulating-risk.git@hawkdove-batchrun +``` +For convenience, an example [slurm batch script](simrisk_batch.slurm) is +included for running the batch run script (some portions are +specific to Princeton's Research Computing HPC environment.) + +- Customize the slurm batch script as desired, copy it to the cluster, and submit +the job: `sbatch simrisk_batch.slurm` + +By default, the batch run script will use all available processors, and will +create model and agent data files under a `data/hawkdovemulti/` directory +relative to the working directory where the script is called. + + + + diff --git a/simulatingrisk/hawkdovemulti/batch_run.py b/simulatingrisk/hawkdovemulti/batch_run.py new file mode 100755 index 0000000..40c1b8e --- /dev/null +++ b/simulatingrisk/hawkdovemulti/batch_run.py @@ -0,0 +1,266 @@ +#!/usr/bin/env python + +import argparse +import csv +from datetime import datetime +import multiprocessing +import os + +from tqdm.auto import tqdm + +from mesa.batchrunner import _make_model_kwargs, _collect_data + +from simulatingrisk.hawkdovemulti.model import HawkDoveMultipleRiskModel + + +neighborhood_sizes = list(HawkDoveMultipleRiskModel.neighborhood_sizes) + +# NOTE: it's better to be explicit about even parameters +# instead of relying on model defaults, because +# parameters specified here are included in data exports + + +# combination of parameters we want to run +params = { + "default": { + "grid_size": [10, 25, 50], # 100], + "risk_adjustment": ["adopt", "average"], + "play_neighborhood": neighborhood_sizes, + "observed_neighborhood": neighborhood_sizes, + "adjust_neighborhood": neighborhood_sizes, + "hawk_odds": [0.5, 0.25, 0.75], + "adjust_every": [2, 10, 20], + "risk_distribution": HawkDoveMultipleRiskModel.risk_distribution_options, + "adjust_payoff": HawkDoveMultipleRiskModel.supported_adjust_payoffs, + # random? + }, + # specific scenarios to allow paired statistical tests + "risk_adjust": { + # ary risk adjustment + "risk_adjustment": ["adopt", "average"], + "risk_distribution": "uniform", + # use model defaults; grid size must be specified + "grid_size": 10, # 25, + }, + "payoff": { + "adjust_payoff": HawkDoveMultipleRiskModel.supported_adjust_payoffs, + "risk_distribution": "uniform", + # use model defaults; grid size must be specified + "grid_size": 25, + }, + "distribution": { + "risk_distribution": HawkDoveMultipleRiskModel.risk_distribution_options, + # adopt tends to converge faster; LB also says it's more interesting & simpler + "risk_adjustment": "adopt", + # use model defaults; grid size must be specified + "grid_size": 10, + }, +} + + +# method for multiproc running model with a set of params +def run_hawkdovemulti_model(args): + run_id, iteration, params, max_steps = args + # simplified model runner adapted from mesa batch run code + + model = HawkDoveMultipleRiskModel(**params) + while model.running and model.schedule.steps <= max_steps: + try: + model.step() + # by default, signals propagate to all processes + # take advantage of that to exit and save results + except KeyboardInterrupt: + # if we get a ctrl-c / keyboard interrupt, stop looping + # and finish data collection to report on whatever was completed + break + + # collect data for the last step + # (scheduler is 1-based index but data collection is 0-based) + step = model.schedule.steps - 1 + + model_data, all_agents_data = _collect_data(model, step) + + # combine run id, step, and params, with collected model data + run_data = {"RunId": run_id, "iteration": iteration, "Step": step} + run_data.update(params) + run_data.update(model_data) + + agent_data = [ + { + "RunId": run_id, + "iteration": iteration, + "Step": step, + **agent_data, + } + for agent_data in all_agents_data + ] + + return run_data, agent_data + + +def batch_run( + params, + iterations, + number_processes, + max_steps, + progressbar, + collect_agent_data, + file_prefix, + max_runs, + param_choice, +): + run_params = params.get(param_choice) + + param_combinations = _make_model_kwargs(run_params) + total_param_combinations = len(param_combinations) + total_runs = total_param_combinations * iterations + print( + f"{total_param_combinations} parameter combinations, " + + f"{iterations} iteration{'s' if iterations != 1 else ''}, " + + f"{total_runs} total runs" + ) + + # create a list of all the parameters to run, with run id and iteration + runs_list = [] + run_id = 0 + for params in param_combinations: + for iteration in range(iterations): + runs_list.append((run_id, iteration, params, max_steps)) + run_id += 1 + + # if maximum runs is specified, truncate the list of run arguments + if max_runs: + runs_list = runs_list[:max_runs] + + # collect data in a directory for this model + data_dir = os.path.join("data", "hawkdovemulti") + os.makedirs(data_dir, exist_ok=True) + datestr = datetime.today().isoformat().replace(".", "_").replace(":", "") + model_output_filename = os.path.join(data_dir, f"{file_prefix}{datestr}_model.csv") + if collect_agent_data: + agent_output_filename = os.path.join( + data_dir, f"{file_prefix}{datestr}_agent.csv" + ) + + message = f"Saving data collection results to:\n {model_output_filename}" + if collect_agent_data: + message += f"\n {agent_output_filename}" + print(message) + + # open output files so data can be written as it is generated + with open(model_output_filename, "w", newline="") as model_output_file: + if collect_agent_data: + agent_output_file = open(agent_output_filename, "w", newline="") + + model_dict_writer = None + agent_dict_writer = None + + # adapted from mesa batch run code + with tqdm(total=total_runs, disable=not progressbar) as pbar: + with multiprocessing.Pool(number_processes) as pool: + for model_data, agent_data in pool.imap_unordered( + run_hawkdovemulti_model, runs_list + ): + # initialize dictwriter and start csv after the first batch + if model_dict_writer is None: + # get field names from first entry + model_dict_writer = csv.DictWriter( + model_output_file, model_data.keys() + ) + model_dict_writer.writeheader() + + model_dict_writer.writerow(model_data) + + if collect_agent_data: + if agent_dict_writer is None: + # get field names from first entry + agent_dict_writer = csv.DictWriter( + agent_output_file, agent_data[0].keys() + ) + agent_dict_writer.writeheader() + + agent_dict_writer.writerows(agent_data) + + pbar.update() + + if collect_agent_data: + agent_output_file.close() + + +def main(): + parser = argparse.ArgumentParser( + prog="hawk/dove batch_run", + description="Batch run for hawk/dove multiple risk attitude simulation.", + epilog="""Data files will be created in data/hawkdovemulti/ + relative to current path.""", + ) + parser.add_argument( + "-i", + "--iterations", + type=int, + help="Number of iterations to run for each set of parameters " + + "(default: %(default)s)", + default=100, + ) + parser.add_argument( + "-m", + "--max-steps", + help="Maximum steps to run simulations if they have not already " + + "converged (default: %(default)s)", + default=1000, # new convergence logic seems to converge around 400 + type=int, + ) + parser.add_argument( + "-p", + "--processes", + type=int, + help="Number of processes to use (default: all available CPUs)", + default=None, + ) + parser.add_argument( + "--progress", + help="Display progress bar", + action=argparse.BooleanOptionalAction, + default=True, + ) + parser.add_argument( + "--agent-data", + help="Store agent data", + action=argparse.BooleanOptionalAction, + default=False, + ) + parser.add_argument( + "--file-prefix", + help="Prefix for data filenames (no prefix by default)", + default="", + ) + parser.add_argument( + "--max-runs", + help="Stop after the specified number of runs " + + "(for development/troubleshooting)", + type=int, + default=None, + ) + parser.add_argument( + "--params", + help="Run a specific set of parameters", + choices=params.keys(), + default="default", + ) + + args = parser.parse_args() + batch_run( + params, + args.iterations, + args.processes, + args.max_steps, + args.progress, + args.agent_data, + args.file_prefix, + args.max_runs, + args.params, + ) + + +if __name__ == "__main__": + main() diff --git a/simulatingrisk/hawkdovemulti/model.py b/simulatingrisk/hawkdovemulti/model.py new file mode 100644 index 0000000..9d0d210 --- /dev/null +++ b/simulatingrisk/hawkdovemulti/model.py @@ -0,0 +1,430 @@ +import statistics +from collections import Counter +from enum import IntEnum +from functools import cached_property + + +from simulatingrisk.hawkdove.model import HawkDoveModel, HawkDoveAgent + + +class HawkDoveMultipleRiskAgent(HawkDoveAgent): + """ + An agent with random risk attitude playing Hawk or Dove. Optionally + adjusts risks based on most successful neighbor, depending on model + configuration. + """ + + #: points since last adjustment round; starts at 0 + recent_points = 0 + + #: whether or not risk level changed on the last adjustment round + risk_level_changed = False + + def set_risk_level(self): + # get risk attitude from model based on configured distribution + self.risk_level = self.model.get_risk_attitude() + + def play(self): + # save total points before playing so we only need to calculate + # current round payoff once + prev_points = self.points + super().play() + # when enabled by the model, periodically adjust risk level + + # add payoff from current round to recent points + self.recent_points += self.points - prev_points + + if self.model.adjustment_round: + self.adjust_risk() + # reset to zero to track points until next adjustment round + self.recent_points = 0 + + @property + def adjust_neighbors(self): + """neighbors to look at when adjusting risk attitude; uses + model adjust_neighborhood size""" + return self.get_neighbors(self.model.adjust_neighborhood) + + @cached_property + def compare_payoff_field(self): + """determine which payoff to compare depending on model option: + (cumulative/total or points since last adjustment round)""" + return "recent_points" if self.model.adjust_payoff == "recent" else "points" + + @property + def compare_payoff(self): + """payoff value to use for adjustment comparison + (depends on model configuration)""" + return getattr(self, self.compare_payoff_field) + + @property + def most_successful_neighbor(self): + """identify and return the neighbor with the most points""" + # sort neighbors by points, highest points first + # adapted from risky bet wealthiest neighbor + + return sorted( + self.adjust_neighbors, + key=lambda x: getattr(x, self.compare_payoff_field), + reverse=True, + )[0] + + def adjust_risk(self): + # look at neighbors + # if anyone has more points + # either adopt their risk attitude or average theirs with yours + + best = self.most_successful_neighbor + + # if most successful neighbor has more points and a different + # risk attitude, adjust + if ( + best.compare_payoff > self.compare_payoff + and best.risk_level != self.risk_level + ): + # adjust risk based on model configuration + if self.model.risk_adjustment == "adopt": + # adopt neighbor's risk level + self.risk_level = best.risk_level + elif self.model.risk_adjustment == "average": + # average theirs with mine, then round to a whole number + # since this model uses discrete risk levels + self.risk_level = round( + statistics.mean([self.risk_level, best.risk_level]) + ) + + # track that risk attitude has been updated + self.risk_level_changed = True + else: + # track that risk attitude was not changed + self.risk_level_changed = False + + +class RiskState(IntEnum): + """Categorization of population risk states""" + + # majority risk inclined + c1 = 1 + c2 = 2 + c3 = 3 + c4 = 4 + + # majority risk moderate + c5 = 5 + c6 = 6 + c7 = 7 + c8 = 8 + + # majority risk avoidant + c9 = 9 + c10 = 10 + c11 = 11 + c12 = 12 + + # no clear majority + c13 = 13 + + @classmethod + def category(cls, val): + # handle both integer and risk state enum value + if isinstance(val, RiskState): + val = val.value + if val in {1, 2, 3, 4}: + return "majority risk inclined" + if val in {5, 6, 7, 8}: + return "majority risk moderate" + if val in {9, 10, 11, 12}: + return "majority risk avoidant" + return "no majority" + + def __str__(self): + # override string method to return just the numeric value, + # for better serialization of collected data + return str(self.value) + + +class HawkDoveMultipleRiskModel(HawkDoveModel): + """ + Model for hawk/dove game with variable risk attitudes. Supports + all parameters in :class:`~simulatingrisk.hawkdove.model.HawkDoveModel` + and adds several parmeters to control if and how agents adjust + their risk attitudes (strategy, frequency, and neighborhood size). + + :param risk_adjustment: strategy agents should use for adjusting risk; + None (default), adopt, or average + :param adjust_every: when risk adjustment is enabled, adjust every + N rounds (default: 10) + :param adjust_neighborhood: size of neighborhood to look at when + adjusting risk attitudes; 4, 8, or 24 (default: play_neighborhood) + :param adjust_payoff: when comparing neighbors points for risk adjustment, + consider cumulative payoff (`total`) or payoff since the + last adjustment round (`recent`) (default: recent) + """ + + risk_attitudes = "variable" + agent_class = HawkDoveMultipleRiskAgent + + supported_risk_adjustments = (None, "adopt", "average") + supported_adjust_payoffs = ("recent", "total") + risk_distribution_options = ( + "uniform", + "normal", + "skewed left", + "skewed right", + "bimodal", + ) + + def __init__( + self, + grid_size, + risk_adjustment="adopt", + risk_distribution="uniform", + adjust_every=10, + adjust_neighborhood=None, + adjust_payoff="recent", + *args, + **kwargs, + ): + # convert string input from solara app parameters to None + if risk_adjustment == "none": + risk_adjustment = None + + # check parameters + if risk_distribution not in self.risk_distribution_options: + raise ValueError( + f"Unsupported risk distribution '{risk_distribution}'; " + + f"must be one of { ', '.join(self.risk_distribution_options) }" + ) + + # make sure risk adjustment is valid + if risk_adjustment not in self.supported_risk_adjustments: + risk_adjust_opts = ", ".join( + [opt or "none" for opt in self.supported_risk_adjustments] + ) + raise ValueError( + f"Unsupported risk adjustment '{risk_adjustment}'; " + + f"must be one of {risk_adjust_opts}" + ) + if adjust_payoff not in self.supported_adjust_payoffs: + adjust_payoffs_opts = ", ".join(self.supported_adjust_payoffs) + raise ValueError( + f"Unsupported adjust payoff option '{adjust_payoff}'; " + + f"must be one of {adjust_payoffs_opts}" + ) + + # initialize a risk attitude generator based on configured distrbution + # must be set before calling super for agent init + self.risk_distribution = risk_distribution + self.risk_attitude_generator = self.get_risk_attitude_generator() + + super().__init__(grid_size, *args, **kwargs) + + self.risk_adjustment = risk_adjustment + self.adjust_round_n = adjust_every + # if adjust neighborhood is not specified, then use the same size + # as play neighborhood + self.adjust_neighborhood = adjust_neighborhood or self.play_neighborhood + # store whether to compare cumulative payoff or since last adjustment round + self.adjust_payoff = adjust_payoff + + def _risk_level_in_bounds(self, value): + # check if a generated risk level is within bounds + return self.min_risk_level <= value <= self.max_risk_level + + def get_risk_attitude_generator(self): + """return a generator that will return risk attitudes for individual + agents based on the configured distribution.""" + if self.risk_distribution == "uniform": + # uniform/random: generate random integer within risk level range + while True: + yield self.random.randint(self.min_risk_level, self.max_risk_level) + if self.risk_distribution == "normal": + # return values from a normal distribution centered around 4.5 + while True: + yield round(self.random.gauss(4.5, 1.5)) + elif self.risk_distribution == "skewed left": + # return values from a triangler distribution centered around 0 + while True: + yield round( + self.random.triangular(self.min_risk_level, self.max_risk_level, 0) + ) + elif self.risk_distribution == "skewed right": + # return values from a triangular distribution centered around 9 + while True: + yield round( + self.random.triangular(self.min_risk_level, self.max_risk_level, 9) + ) + elif self.risk_distribution == "bimodal": + # to generate a bimodal distribution, alternately generate + # values from two different normal distributions centered + # around the beginning and end of our risk attitude range + while True: + yield round(self.random.gauss(0, 1.5)) + yield round(self.random.gauss(9, 1.5)) + # NOTE: on smaller grids, using 0/9 makes it extremely + # unlikely to get mid-range risk values (4/5) + + def get_risk_attitude(self): + """return the next value from risk attitude generator, based on + configured distribution.""" + val = next(self.risk_attitude_generator) + + # for bimodal distribution, clamp values to range + if self.risk_distribution == "bimodal": + return max(self.min_risk_level, min(self.max_risk_level, val)) + + # for all other distributions: + # occasionally generators will return values that are out of range. + # rather than capping to the min/max and messing up the distribution, + # just get the next value + while not self._risk_level_in_bounds(val): + val = next(self.risk_attitude_generator) + return val + + @property + def adjustment_round(self) -> bool: + """is the current round an adjustment round?""" + # check if the current step is an adjustment round + # when risk adjustment is enabled, agents should adjust their risk + # strategy every N rounds; + return ( + self.risk_adjustment + and self.schedule.steps > 0 + and self.schedule.steps % self.adjust_round_n == 0 + ) + + def get_data_collector_options(self): + # in addition to common hawk/dove data points, + # we want to include population risk category + opts = super().get_data_collector_options() + model_reporters = { + "population_risk_category": "population_risk_category", + "num_agents_risk_changed": "num_agents_risk_changed", + } + for risk_level in range(self.min_risk_level, self.max_risk_level + 1): + field = f"total_r{risk_level}" + model_reporters[field] = field + + opts["model_reporters"].update(model_reporters) + opts["agent_reporters"].update({"risk_level_changed": "risk_level_changed"}) + return opts + + def step(self): + # delete cached property before the next round begins, + # so we recalcate values for current round before collecting data + try: + del self.total_per_risk_level + except AttributeError: + # property hasn't been set yet on the first round, ok to ignore + pass + super().step() + + @property + def num_agents_risk_changed(self): + return len([a for a in self.schedule.agents if a.risk_level_changed]) + + @property + def converged(self): + # check if the simulation is stable and should stop running + # based on the number of agents changing their risk level + + # checking whether agents risk level changed only works + # when adjustmend is enabled; if it is not, fallback + # do base model logic, which is based on rolling avg % hawk + if not self.risk_adjustment: + return super().converged + + return ( + self.schedule.steps > max(self.adjust_round_n, 50) + and self.num_agents_risk_changed == 0 + ) + + @cached_property + def total_per_risk_level(self): + # tally the number of agents for each risk level + return Counter([a.risk_level for a in self.schedule.agents]) + + def __getattr__(self, attr): + # support dynamic properties for data collection on total by risk level + if attr.startswith("total_r"): + try: + r = int(attr.replace("total_r", "")) + # only handle risk levels that are in bounds + if r > self.max_risk_level or r < self.min_risk_level: + raise AttributeError + return self.total_per_risk_level[r] + except ValueError: + # ignore and throw attribute error + pass + + raise AttributeError + + @property + def population_risk_category(self): + # calculate a category of risk distribution for the population + # based on the proportion of agents in different risk categories + # (categorization scheme defined by LB) + + # count the number of agents in three groups: + risk_counts = self.total_per_risk_level + # Risk-inclined (RI) : r = 0, 1, 2 + # Risk-moderate (RM): r = 3, 4, 5 + # Risk-avoidant (RA): r = 6, 7, 8 + total = { + "risk_inclined": risk_counts[0] + risk_counts[1] + risk_counts[2], + "risk_moderate": risk_counts[3] + + risk_counts[4] + + risk_counts[5] + + risk_counts[6], + "risk_avoidant": risk_counts[7] + risk_counts[8] + risk_counts[9], + } + # for each group, calculate percent of agents in that category + total_agents = len(self.schedule.agents) + percent = {key: val / total_agents for key, val in total.items()} + + # majority risk inclined (> 50%) + if percent["risk_inclined"] > 0.5: + # If < 10% are RM & < 10% are RA: let c = 1 + if percent["risk_moderate"] < 0.1 and percent["risk_avoidant"] < 0.1: + return RiskState.c1 + # If > 10% are RM & < 10% are RA: let c = 2 + if percent["risk_moderate"] > 0.1 and percent["risk_avoidant"] < 0.1: + return RiskState.c2 + # If > 10% are RM & > 10% are RA: let c = 3 + if percent["risk_moderate"] > 0.1 and percent["risk_avoidant"] > 0.1: + return RiskState.c3 + # If < 10% are RM & > 10% are RA: let c = 4 + if percent["risk_moderate"] < 0.1 and percent["risk_avoidant"] > 0.1: + return RiskState.c4 + + # majority risk moderate + if percent["risk_moderate"] > 0.5: + # If < 10% are RI & < 10% are RA: let c = 7 + if percent["risk_inclined"] < 0.1 and percent["risk_avoidant"] < 0.1: + return RiskState.c7 + # If > 10% are RI & < 10% are RA: let c = 5 + if percent["risk_inclined"] > 0.1 and percent["risk_avoidant"] < 0.1: + return RiskState.c5 + # If > 10% are RI & > 10% are RA: let c = 6 + if percent["risk_inclined"] > 0.1 and percent["risk_avoidant"] > 0.1: + return RiskState.c6 + # If < 10% are RI & > 10% are RA: let c = 8 + if percent["risk_inclined"] < 0.1 and percent["risk_avoidant"] > 0.1: + return RiskState.c8 + + # majority risk avoidant + if percent["risk_avoidant"] > 0.5: + # If < 10% are RM & < 10% are RI: let c = 12 + if percent["risk_moderate"] < 0.1 and percent["risk_inclined"] < 0.1: + return RiskState.c12 + # If > 10% are RM & < 10% are RI: let c = 11 + if percent["risk_moderate"] > 0.1 and percent["risk_inclined"] < 0.1: + return RiskState.c11 + # If > 10% are RM & > 10% are RI: let c = 10 + if percent["risk_moderate"] > 0.1 and percent["risk_inclined"] > 0.1: + return RiskState.c10 + # If < 10% are RM & > 10% are RI: let c = 9 + if percent["risk_moderate"] < 0.1 and percent["risk_inclined"] > 0.1: + return RiskState.c9 + + return RiskState.c13 diff --git a/tests/test_hawkdove.py b/tests/test_hawkdove.py new file mode 100644 index 0000000..8d040f4 --- /dev/null +++ b/tests/test_hawkdove.py @@ -0,0 +1,263 @@ +import math +from unittest.mock import Mock, patch +from collections import Counter + +import pytest + +from simulatingrisk.hawkdove.model import ( + HawkDoveAgent, + Play, + HawkDoveSingleRiskModel, + HawkDoveSingleRiskAgent, +) + + +def test_agent_neighbors(): + # initialize model with a small grid, neighborhood of 8 + model = HawkDoveSingleRiskModel(3, play_neighborhood=8, agent_risk_level=4) + # every agent should have 8 neighbors when diagonals are included + assert all([len(agent.play_neighbors) == 8 for agent in model.schedule.agents]) + + # neighborhood of 4 + model = HawkDoveSingleRiskModel(3, play_neighborhood=4, agent_risk_level=2) + assert all([len(agent.play_neighbors) == 4 for agent in model.schedule.agents]) + + # neighborhood of 24 (grid needs to be at least 5x5) + model = HawkDoveSingleRiskModel(5, play_neighborhood=24, agent_risk_level=5) + assert all([len(agent.play_neighbors) == 24 for agent in model.schedule.agents]) + + +def test_agent_initial_choice(): + grid_size = 100 + model = HawkDoveSingleRiskModel(grid_size, agent_risk_level=5) + # for now, initial choice is random (hawk-odds param still todo) + initial_choices = [a.choice for a in model.schedule.agents] + choice_count = Counter(initial_choices) + # default should be around a 50/50 split + half_agents = model.num_agents / 2.0 + for choice, total in choice_count.items(): + assert math.isclose(total, half_agents, rel_tol=0.05) + + +def test_agent_initial_choice_hawkodds(): + grid_size = 100 + # specify hawk-odds other than 05 + model = HawkDoveSingleRiskModel(grid_size, hawk_odds=0.3, agent_risk_level=2) + initial_choices = [a.choice for a in model.schedule.agents] + choice_count = Counter(initial_choices) + # expect about 30% hawks + expected_hawks = model.num_agents * 0.3 + assert math.isclose(choice_count[Play.HAWK], expected_hawks, rel_tol=0.05) + + +def test_base_agent_risk_level(): + # base class should raise error because method to set risk level is not defined + with pytest.raises(NotImplementedError): + HawkDoveAgent(1, Mock()) + + +def test_agent_initial_risk_level(): + # single risk agent sets risk level based on model + agent = HawkDoveSingleRiskAgent(1, Mock(agent_risk_level=2)) + assert agent.risk_level == 2 + + +def test_agent_repr(): + agent_id = 1 + risk_level = 3 + agent = HawkDoveSingleRiskAgent(agent_id, Mock(agent_risk_level=risk_level)) + assert ( + repr(agent) + == f"" + ) + + +def test_model_single_risk_level(): + risk_level = 3 + model = HawkDoveSingleRiskModel(5, agent_risk_level=risk_level) + for agent in model.schedule.agents: + assert agent.risk_level == risk_level + + # handle zero properly (should not be treated the same as None) + risk_level = 0 + model = HawkDoveSingleRiskModel(5, agent_risk_level=risk_level) + for agent in model.schedule.agents: + assert agent.risk_level == risk_level + + +def test_bad_neighborhood_size(): + with pytest.raises(ValueError): + HawkDoveSingleRiskModel(3, play_neighborhood=3, agent_risk_level=6) + with pytest.raises(ValueError): + agent = HawkDoveSingleRiskAgent(1, Mock(agent_risk_level=2)) + agent.get_neighbors(5) + + +def test_observed_neighborhood_size(): + # observed neighborhood size is also configurable + # common options, irrelevant for this test + opts = {"agent_risk_level": 1, "play_neighborhood": 4} + model = HawkDoveSingleRiskModel(3, observed_neighborhood=4, **opts) + assert model.observed_neighborhood == 4 + model = HawkDoveSingleRiskModel(3, observed_neighborhood=8, **opts) + assert model.observed_neighborhood == 8 + model = HawkDoveSingleRiskModel(3, observed_neighborhood=24, **opts) + assert model.observed_neighborhood == 24 + with pytest.raises(ValueError): + HawkDoveSingleRiskModel(3, observed_neighborhood=23, **opts) + + +def test_num_dove_neighbors(): + # initialize an agent with a mock model + agent = HawkDoveSingleRiskAgent(1, Mock(agent_risk_level=2)) + mock_neighbors = [ + Mock(last_choice=Play.HAWK), + Mock(last_choice=Play.HAWK), + Mock(last_choice=Play.HAWK), + Mock(last_choice=Play.DOVE), + ] + + with patch.object(HawkDoveSingleRiskAgent, "observed_neighbors", mock_neighbors): + assert agent.num_dove_neighbors == 1 + + +def test_agent_choose(): + agent = HawkDoveSingleRiskAgent(1, Mock(agent_risk_level=3)) + # on the first round, nothing should happen (uses initial choice) + agent.model.schedule.steps = 0 + # disable random play for now + agent.model.random_play_odds = 0 + agent.choose() + + # on subsequent rounds, choose based on neighbors and risk level + agent.model.schedule.steps = 1 + + # given a specified number of dove neighbors and risk level + with patch.object(HawkDoveAgent, "proportional_num_dove_neighbors", 3): + # an agent with `r=0` will always take the risky choice + # (any risk is acceptable). + agent.risk_level = 0 + agent.choose() + assert agent.choice == Play.HAWK + + # risk level 2 with 3 doves will play dove + agent.risk_level = 2 + agent.choose() + assert agent.choice == Play.HAWK + + # risk level three with 3 doves will play dove + # (greater than or equal) + agent.risk_level = 3 + agent.choose() + assert agent.choice == Play.HAWK + + # agent with risk level 8 will always play dove + agent.risk_level = 8 + agent.choose() + assert agent.choice == Play.DOVE + + +@patch("simulatingrisk.hawkdove.model.coinflip") +def test_agent_choose_random(mock_coinflip): + agent = HawkDoveSingleRiskAgent(1, Mock(agent_risk_level=3)) + agent.model.schedule.steps = 1 + # reset after init, which calls coinflip for initial play + mock_coinflip.reset_mock() + with patch.object(HawkDoveAgent, "proportional_num_dove_neighbors", 2): + # if random play is disabled, should not flip a coin + agent.model.random_play_odds = 0 + agent.choose() + assert mock_coinflip.call_count == 0 + + # some chance of random play + agent.model.random_play_odds = 0.5 + mock_coinflip.side_effect = [True, Play.DOVE] + agent.choose() + # should call twice: once for random play, once for choice + assert mock_coinflip.call_count == 2 + # called for random play with model odds + mock_coinflip.assert_any_call( + [True, False], weight=agent.model.random_play_odds + ) + # called a second time to determine which play to make + mock_coinflip.assert_any_call([Play.HAWK, Play.DOVE]) + assert agent.choice == Play.DOVE + + +def test_proportional_num_dove_neighbors(): + model = HawkDoveSingleRiskModel(4, agent_risk_level=3) + agent = HawkDoveSingleRiskAgent(1, model) + + ## equal play/observed; scales to 9 (risk level range) + model.observed_neighborhood = 4 + with patch.object(HawkDoveAgent, "num_dove_neighbors", 3): + assert agent.proportional_num_dove_neighbors == 7 + + model.observed_neighborhood = 8 + with patch.object(HawkDoveAgent, "num_dove_neighbors", 5): + assert agent.proportional_num_dove_neighbors == 6 + + # observe more than 8 + model.observed_neighborhood = 24 + with patch.object(HawkDoveAgent, "num_dove_neighbors", 20): + assert agent.proportional_num_dove_neighbors == 8 + + +def test_agent_choose_when_observe_play_differ(): + # confirm that adjusted value is used to determine play + + model = HawkDoveSingleRiskModel( + 4, agent_risk_level=3, observed_neighborhood=24, play_neighborhood=8 + ) + agent = HawkDoveSingleRiskAgent(3, model) + with patch.object(HawkDoveAgent, "num_dove_neighbors", 5): + agent.choose() == Play.DOVE + + with patch.object(HawkDoveAgent, "num_dove_neighbors", 6): + agent.choose() == Play.HAWK + + +def test_agent_play(): + agent = HawkDoveSingleRiskAgent(1, Mock(agent_risk_level=3)) + # on the first round, last choice should be unset + assert agent.last_choice is None + assert agent.points == 0 + + # set initial choice and supply mock neighbors + # so we can test expected results + agent.choice = Play.HAWK + neighbor_hawk = Mock(choice=Play.HAWK) + neighbor_dove = Mock(choice=Play.DOVE) + with patch.object(HawkDoveAgent, "play_neighbors", [neighbor_hawk, neighbor_dove]): + agent.play() + # should get 3 points against dove and 0 against the hawk + assert agent.points == 3 + 0 + # should store current choice for next round + assert agent.last_choice == Play.HAWK + + +def test_agent_payoff(): + # If I play HAWK and neighbor plays DOVE: 3 + # If I play DOVE and neighbor plays DOVE: 2 + # If I play DOVE and neighbor plays HAWK: 1 + # If I play HAWK and neighbor plays HAWK: 0 + + agent = HawkDoveSingleRiskAgent(1, Mock(agent_risk_level=2)) + other_agent = HawkDoveSingleRiskAgent(2, Mock(agent_risk_level=3)) + # If I play HAWK and neighbor plays DOVE: 3 + agent.choice = Play.HAWK + other_agent.choice = Play.DOVE + assert agent.payoff(other_agent) == 3 + # inverse: play DOVE and neighbor plays HAWK: 1 + assert other_agent.payoff(agent) == 1 + + # if both play hawk, payoff is zero for both + other_agent.choice = Play.HAWK + assert agent.payoff(other_agent) == 0 + assert other_agent.payoff(agent) == 0 + + # if both play dove, payoff is two for both + agent.choice = Play.DOVE + other_agent.choice = Play.DOVE + assert agent.payoff(other_agent) == 2 + assert other_agent.payoff(agent) == 2 diff --git a/tests/test_hawkdovemulti.py b/tests/test_hawkdovemulti.py new file mode 100644 index 0000000..9dd756e --- /dev/null +++ b/tests/test_hawkdovemulti.py @@ -0,0 +1,439 @@ +import statistics +from unittest.mock import patch, Mock + +import pytest + +from simulatingrisk.hawkdove.model import Play +from simulatingrisk.hawkdovemulti.model import ( + HawkDoveMultipleRiskModel, + HawkDoveMultipleRiskAgent, + RiskState, +) + + +def test_init(): + model = HawkDoveMultipleRiskModel(5) + # defaults + assert model.risk_adjustment == "adopt" + assert model.hawk_odds == 0.5 + assert model.play_neighborhood == 8 + assert model.adjust_neighborhood == 8 + assert model.adjust_round_n == 10 + assert model.risk_distribution == "uniform" + + # init with risk adjustment + model = HawkDoveMultipleRiskModel( + 5, + play_neighborhood=4, + hawk_odds=0.2, + risk_adjustment="adopt", + adjust_every=5, + adjust_neighborhood=24, + ) + + assert model.risk_adjustment == "adopt" + assert model.adjust_round_n == 5 + assert model.hawk_odds == 0.2 + assert model.play_neighborhood == 4 + assert model.adjust_neighborhood == 24 + + # handle string none for solara app parameters + model = HawkDoveMultipleRiskModel(5, risk_adjustment="none") + assert model.risk_adjustment is None + + # complain about invalid adjustment type + with pytest.raises(ValueError, match="Unsupported risk adjustment 'bogus'"): + HawkDoveMultipleRiskModel(3, risk_adjustment="bogus") + + # complain about invalid adjust payoff + with pytest.raises(ValueError, match="Unsupported adjust payoff option 'bogus'"): + HawkDoveMultipleRiskModel(3, adjust_payoff="bogus") + + +def test_init_variable_risk_level(): + model = HawkDoveMultipleRiskModel(5) + # when risk level is variable/random, agents should have different risk levels + risk_levels = set([agent.risk_level for agent in model.schedule.agents]) + assert len(risk_levels) > 1 + + +adjustment_testdata = [ + # init parameters, expected adjustment round + ({"risk_adjustment": None}, None), + ({"risk_adjustment": "adopt"}, 10), + ({"risk_adjustment": "average"}, 10), + ({"risk_adjustment": "average", "adjust_every": 3}, 3), +] + + +@pytest.mark.parametrize("params,expect_adjust_step", adjustment_testdata) +def test_adjustment_round(params, expect_adjust_step): + model = HawkDoveMultipleRiskModel(3, **params) + + run_for = (expect_adjust_step or 10) + 1 + + # step through the model enough rounds to encounter one adjustment rounds + # if adjustment is enabled; start at 1 (step count starts at 1) + for i in range(1, run_for): + model.step() + if i == expect_adjust_step: + assert model.adjustment_round + else: + assert not model.adjustment_round + + +def test_total_per_risk_level(): + model = HawkDoveMultipleRiskModel(3) + model.schedule = Mock() + # add a few agents with different risk levels + mock_agents = [ + Mock(risk_level=0), + Mock(risk_level=1), + Mock(risk_level=1), + Mock(risk_level=2), + Mock(risk_level=2), + Mock(risk_level=2), + Mock(risk_level=5), + ] + model.schedule.agents = mock_agents + + totals = model.total_per_risk_level + assert totals[0] == 1 + assert totals[1] == 2 + assert totals[2] == 3 + assert totals[4] == 0 + assert totals[5] == 1 + assert totals[8] == 0 + + # check caching works as desired + mock_agents.append(Mock(risk_level=8)) + model.schedule.agents = mock_agents + # cached total should not change even though agents have changed + assert model.total_per_risk_level[8] == 0 + # step should reset catched property + with patch("builtins.super"): + model.step() + # now the count should be updated + assert model.total_per_risk_level[8] == 1 + + +def test_total_rN_attr(): + # dynamic attributes to get total per risk level, for data collection + model = HawkDoveMultipleRiskModel(3) + model.schedule = Mock() + # add a few agents with different risk levels + model.schedule.agents = [ + Mock(risk_level=0), + Mock(risk_level=1), + Mock(risk_level=1), + Mock(risk_level=2), + Mock(risk_level=2), + Mock(risk_level=2), + ] + assert model.total_r0 == 1 + assert model.total_r1 == 2 + assert model.total_r2 == 3 + assert model.total_r4 == 0 + + # error handling + # - non-numeric + with pytest.raises(AttributeError): + model.total_rfour + # - out of bounds + with pytest.raises(AttributeError): + model.total_r23 + # - unsupported attribute + with pytest.raises(AttributeError): + model.some_other_total + + +def test_population_risk_category(): + model = HawkDoveMultipleRiskModel(3) + model.schedule = Mock() + + # majority risk inclined + model.schedule.agents = [Mock(risk_level=0), Mock(risk_level=1), Mock(risk_level=2)] + assert model.population_risk_category == RiskState.c1 + # three risk-inclined agents and one risk moderate + del model.total_per_risk_level # reset cached property + model.schedule.agents.append(Mock(risk_level=4)) + assert model.population_risk_category == RiskState.c2 + + # majority risk moderate + model.schedule.agents = [Mock(risk_level=4), Mock(risk_level=5), Mock(risk_level=6)] + del model.total_per_risk_level # reset cached property + assert model.population_risk_category == RiskState.c7 + + # majority risk avoidant + model.schedule.agents = [Mock(risk_level=7), Mock(risk_level=8), Mock(risk_level=9)] + del model.total_per_risk_level # reset cached property + assert model.population_risk_category == RiskState.c12 + + +def test_riskstate_label(): + # enum value or integer value + assert RiskState.category(RiskState.c1) == "majority risk inclined" + assert RiskState.category(2) == "majority risk inclined" + assert RiskState.category(RiskState.c5) == "majority risk moderate" + assert RiskState.category(6) == "majority risk moderate" + assert RiskState.category(RiskState.c11) == "majority risk avoidant" + assert RiskState.category(RiskState.c13) == "no majority" + assert RiskState.category(13) == "no majority" + + +def test_riskstate_str(): + # serialize as string of number for data output in batch runs + assert str(RiskState.c1) == "1" + assert str(RiskState.c13) == "13" + + +def test_most_successful_neighbor(): + # initialize two agents with a mock model + # first, measure success based on total/cumulative payoff + agent_total = HawkDoveMultipleRiskAgent( + 1, + HawkDoveMultipleRiskModel(1, observed_neighborhood=8, adjust_payoff="total"), + 1000, + ) + agent_recent = HawkDoveMultipleRiskAgent( + 2, + HawkDoveMultipleRiskModel(1, observed_neighborhood=8, adjust_payoff="recent"), + 1000, + ) + + mock_neighbors = [ + Mock(points=12, recent_points=2), + Mock(points=14, recent_points=13), + Mock(points=23, recent_points=5), + Mock(points=31, recent_points=8), + ] + + with patch.object(HawkDoveMultipleRiskAgent, "adjust_neighbors", mock_neighbors): + # comparing by total points + assert agent_total.most_successful_neighbor.points == 31 + # comparing by recent points + assert agent_recent.most_successful_neighbor.recent_points == 13 + + +def test_compare_payoff(): + # test payoff fields depending on model config (recent/total) + agent_total = HawkDoveMultipleRiskAgent( + 1, + HawkDoveMultipleRiskModel(1, observed_neighborhood=8, adjust_payoff="total"), + 1000, + ) + agent_total.points = 100 + agent_total.recent_points = 10 + + assert agent_total.compare_payoff_field == "points" + assert agent_total.compare_payoff == 100 + + agent_recent = HawkDoveMultipleRiskAgent( + 2, + HawkDoveMultipleRiskModel(1, observed_neighborhood=8, adjust_payoff="recent"), + 1000, + ) + agent_recent.points = 250 + agent_recent.recent_points = 25 + assert agent_recent.compare_payoff_field == "recent_points" + assert agent_recent.compare_payoff == 25 + + +def test_agent_play_points(): + mock_model = HawkDoveMultipleRiskModel(3) + agent = HawkDoveMultipleRiskAgent(1, mock_model) + agent.points = 100 + agent.recent_points = 10 + + # set initial choice and supply mock neighbors + # so we can test expected results + agent.choice = Play.HAWK + neighbor_hawk = Mock(choice=Play.HAWK) + neighbor_dove = Mock(choice=Play.DOVE) + neighbor_dove2 = Mock(choice=Play.DOVE) + with patch.object( + HawkDoveMultipleRiskAgent, + "play_neighbors", + [neighbor_hawk, neighbor_dove, neighbor_dove2], + ): + agent.play() + # should get 3*2 points against dove and 0 against the hawk + # payoff for current round should be added to points and recent points + assert agent.points == 106 + assert agent.recent_points == 16 + + +def test_agent_play_adjust(): + mock_model = Mock( + risk_adjustment="adopt", observed_neighborhood=4, max_risk_level=8 + ) + agent = HawkDoveMultipleRiskAgent(1, mock_model) + # simulate points from previous rounds + agent.recent_points = 250 + # simulate no neighbors to skip payoff calculation + with patch.object( + HawkDoveMultipleRiskAgent, "play_neighbors", new=[] + ) as mock_adjust_risk: + with patch.object(HawkDoveMultipleRiskAgent, "adjust_risk") as mock_adjust_risk: + # when it is not an adjustment round, should not call adjust risk + mock_model.adjustment_round = False + agent.play() + assert mock_adjust_risk.call_count == 0 + # recent points should not be reset if not an adjustment round + assert agent.recent_points + + # should call adjust risk when the model indicates + mock_model.adjustment_round = True + agent.play() + assert mock_adjust_risk.call_count == 1 + # recent points should reset on adjustment round + assert agent.recent_points == 0 + + +def test_adjust_risk_adopt_total(): + # initialize an agent with a mock model + model = Mock( + risk_adjustment="adopt", + observed_neighborhood=4, + max_risk_level=8, + adjust_payoff="total", + ) + agent = HawkDoveMultipleRiskAgent(1, model) + # set a known risk level + agent.risk_level = 2 + # adjust wealth as if the model had run + agent.points = 20 + # set a mock neighbor with more points than current agent + neighbor = HawkDoveMultipleRiskAgent(2, model) + neighbor.risk_level = 3 + neighbor.points = 15000 + with patch.object(HawkDoveMultipleRiskAgent, "most_successful_neighbor", neighbor): + agent.adjust_risk() + # default behavior is to adopt successful risk level + assert agent.risk_level == neighbor.risk_level + agent.recent_points = 0 + + # now simulate a wealthiest neighbor with fewer points than current agent + neighbor.points = 12 + agent.recent_points = 5 + neighbor.risk_level = 3 + prev_risk_level = agent.risk_level + agent.adjust_risk() + # risk level should not be changed + assert agent.risk_level == prev_risk_level + agent.recent_points = 0 + + +def test_adjust_risk_adopt_recent(): + # initialize an agent with a mock model + model = Mock( + risk_adjustment="adopt", + observed_neighborhood=4, + max_risk_level=8, + adjust_payoff="recent", + ) + + agent = HawkDoveMultipleRiskAgent(1, model) + # set a known risk level + agent.risk_level = 2 + # adjust wealth as if the model had run + agent.recent_points = 12 + agent.points = 3000 + # set a mock neighbor with more points than current agent + neighbor = HawkDoveMultipleRiskAgent(2, model) + neighbor.risk_level = 3 + neighbor.recent_points = 1500 + neighbor.points = 200 + with patch.object(HawkDoveMultipleRiskAgent, "most_successful_neighbor", neighbor): + agent.adjust_risk() + # default behavior is to adopt successful risk level + assert agent.risk_level == neighbor.risk_level + # agent should track that risk attitude was updated + assert agent.risk_level_changed + + # now simulate a wealthiest neighbor with fewer points than current agent + neighbor.recent_points = 12 + agent.recent_points = 5 + neighbor.risk_level = 3 + prev_risk_level = agent.risk_level + agent.adjust_risk() + # risk level should not be changed + assert agent.risk_level == prev_risk_level + # agent should track that risk attitude was not changed + assert not agent.risk_level_changed + + +def test_adjust_risk_average(): + # same as previous test, but with average risk adjustment strategy + agent = HawkDoveMultipleRiskAgent( + 1, Mock(risk_adjustment="average", observed_neighborhood=4, max_risk_level=8) + ) + # set a known risk level + agent.risk_level = 2 + # adjust points as if the model had run + agent.points = 300 + # set a neighbor with more points than current agent + neighbor = Mock(compare_payoff=350, risk_level=3) + with patch.object(HawkDoveMultipleRiskAgent, "most_successful_neighbor", neighbor): + prev_risk_level = agent.risk_level + agent.adjust_risk() + # new risk level should be average of previous and most successful + assert agent.risk_level == round( + statistics.mean([neighbor.risk_level, prev_risk_level]) + ) + + +def test_risk_level_in_bounds(): + model = HawkDoveMultipleRiskModel(3) + for i in range(8): + assert model._risk_level_in_bounds(i) + + assert not model._risk_level_in_bounds(-1) + assert not model._risk_level_in_bounds(10) + + +def test_get_risk_attitude_generator(): + model = HawkDoveMultipleRiskModel(3) + model.random = Mock() + + # check that the correct methods are called depending on risk distribution + model.risk_distribution = "uniform" + next(model.get_risk_attitude_generator()) + model.random.randint.assert_called_with(model.min_risk_level, model.max_risk_level) + + model.risk_distribution = "normal" + model.random.gauss.return_value = 3.3 # value to convert to int + next(model.get_risk_attitude_generator()) + model.random.gauss.assert_called_with(4.5, 1.5) + + model.risk_distribution = "skewed left" + model.random.triangular.return_value = 2.1 # value to round + next(model.get_risk_attitude_generator()) + model.random.triangular.assert_called_with( + model.min_risk_level, model.max_risk_level, 0 + ) + + model.risk_distribution = "skewed right" + model.random.triangular.return_value = 7.6 # value to round + next(model.get_risk_attitude_generator()) + model.random.triangular.assert_called_with( + model.min_risk_level, model.max_risk_level, 9 + ) + + # bimodal returns values from from two different distributions; call twice + model.risk_distribution = "bimodal" + model.random.gauss.return_value = 3.2 + risk_gen = model.get_risk_attitude_generator() + next(risk_gen) + next(risk_gen) + model.random.gauss.assert_any_call(0, 1.5) + model.random.gauss.assert_any_call(9, 1.5) + + +def test_get_risk_attitude(): + model = HawkDoveMultipleRiskModel(3) + model.risk_attitude_generator = (x for x in [3, -1, -5, 4]) + # should return value in range as-is + assert model.get_risk_attitude() == 3 + # values out of range should be skipped and next valid value returned + assert model.get_risk_attitude() == 4