diff --git a/src/lpsim/grading/__init__.py b/src/lpsim/grading/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/lpsim/grading/grading.py b/src/lpsim/grading/grading.py new file mode 100644 index 00000000..d9fe5e4d --- /dev/null +++ b/src/lpsim/grading/grading.py @@ -0,0 +1,105 @@ +import json +import os + +from src.lpsim.server.consts import ObjectType + + +TEST_MATCH = os.path.dirname(os.path.abspath(__file__)) + "/match_1.json" + + +class Grading: + def __init__(self, match: str): + with open(match) as f: + self.data = json.load(f) + self.expected_turn = 4 + self.current_turn = 0 + self.character_element = [] + self.grading_rules = {} + self.grade_match(self.data) + + def load_grading(self, obj_type, name, version): + key = "%s+%s+%s" % (obj_type, name, version) + if key in self.grading_rules.keys(): + return self.grading_rules[key] + grading_file = os.path.dirname(os.path.abspath(__file__)) + "/grading_rules/%s/%s+%s" % (obj_type, name, version) + with open(grading_file) as f: + rule = json.load(f) + self.grading_rules[key] = rule + return rule + + def grade_by_rule(self, obj_type, name, version, usage): + rule = self.load_grading(obj_type, name, version) + a = rule[0] + q = rule[1] + n = usage + if usage == 0: + n = max(1, self.expected_turn - self.current_turn) + return a * (1 - pow(q, n)) / (1 - q) + + def grade_match(self, match): + self.current_turn = match["round_number"] + return self.grade_player(match["player_tables"][0]) - self.grade_player(match["player_tables"][1]) + + def grade_player(self, player): + grade = 0 + for character in player["characters"]: + grade += self.grade_character(character) + for status in player["team_status"]: + grade += self.grade_team_status(status) + grade += self.grade_dice(player["dice"]) + grade += self.grade_hands(player["hands"]) + for summon in player["summons"]: + grade += self.grade_summon(summon) + for support in player["supports"]: + grade += self.grade_support(support) + return grade + + def grade_character(self, character): + grade = 0 + if character["name"] == "Barbara": + self.expected_turn += 5 + if character["name"] == "Dehya": + self.expected_turn += 2 + # grading hp + grade += character["hp"] * 10 + # grading attaches + grade += self.grade_attaches(character["attaches"]) + # register elements + self.character_element.append(character["element"]) + return grade + + def grade_attaches(self, attaches): + grade = 0 + for attach in attaches: + print(attach) + if attach["type"] == "character_status": + grade += self.grade_character_status(attach) + else: + grade += self.grade_character_status(attach) + return 0 + + def grade_dice(self, dice): + effect_dice = 0 + for dice in dice["colors"]: + if dice == "OMNI" or dice in self.character_element: + effect_dice += 1 + return effect_dice * 5 + len(dice) + + def grade_hands(self, hands): + return len(hands) * 5 + + def grade_summon(self, summon): + return self.grade_by_rule("summon", summon["name"], summon["version"], summon["usage"]) + + def grade_support(self, support): + return self.grade_by_rule("support", support["name"], support["version"], support["usage"]) + + def grade_character_status(self, status): + return self.grade_by_rule("character_status", status["name"], status["version"], status["usage"]) + + def grade_team_status(self, status): + return self.grade_by_rule("team_status", status["name"], status["version"], status["usage"]) + + +if __name__ == "__main__": + grading = Grading(TEST_MATCH) diff --git a/src/lpsim/model/__init__.py b/src/lpsim/model/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/lpsim/model/card.json b/src/lpsim/model/card.json new file mode 100644 index 00000000..d86f91a6 --- /dev/null +++ b/src/lpsim/model/card.json @@ -0,0 +1,4 @@ +{ + "name": "", + "version": "" +} diff --git a/src/lpsim/model/character.json b/src/lpsim/model/character.json new file mode 100644 index 00000000..b943a535 --- /dev/null +++ b/src/lpsim/model/character.json @@ -0,0 +1,9 @@ +{ + "name": "", + "version": "", + "hp": -1, + "charge": -1, + "attaches": [], + "element_application": [], + "is_alive": true +} \ No newline at end of file diff --git a/src/lpsim/model/character_status.json b/src/lpsim/model/character_status.json new file mode 100644 index 00000000..3806e77c --- /dev/null +++ b/src/lpsim/model/character_status.json @@ -0,0 +1,6 @@ +{ + "name": "", + "version": "", + "usage": -1, + "max_usage": -1 +} diff --git a/src/lpsim/model/dice.json b/src/lpsim/model/dice.json new file mode 100644 index 00000000..77c9540f --- /dev/null +++ b/src/lpsim/model/dice.json @@ -0,0 +1,3 @@ +{ + "colors": [] +} \ No newline at end of file diff --git a/src/lpsim/model/match.json b/src/lpsim/model/match.json new file mode 100644 index 00000000..f77a2eae --- /dev/null +++ b/src/lpsim/model/match.json @@ -0,0 +1,7 @@ +{ + "name": "", + "version": "", + "player_tables": [], + "current_player": -1, + "round_number": -1 +} \ No newline at end of file diff --git a/src/lpsim/model/model.py b/src/lpsim/model/model.py new file mode 100644 index 00000000..8612faa0 --- /dev/null +++ b/src/lpsim/model/model.py @@ -0,0 +1,241 @@ +import json +import os + +from src.lpsim.server import * +from src.lpsim.server.consts import ObjectType +from src.lpsim.server.dice import Dice +from src.lpsim.server.match import Match +from src.lpsim.server.player_table import PlayerTable +from src.lpsim.utils import * + +PROTOTYPE_PATH = os.path.dirname(os.path.abspath(__file__)) + "/" + + +class model: + def get_prototype(self, obj_str): + with open(PROTOTYPE_PATH + obj_str + ".json") as f: + data = json.load(f) + return data + + def match_to_json(self, match, name): + match_data = self.get_match(match) + with open(PROTOTYPE_PATH + name + ".json", 'w') as f: + json.dump(match_data, f, indent=4) + + def json_to_match(self, name): + with open(PROTOTYPE_PATH + name + ".json", 'r') as f: + data = json.load(f) + return self.set_match(data) + + def get_match(self, match): + match_model = self.get_prototype("match") + match_model["name"] = match.name + match_model["version"] = match.version + match_model["round_number"] = match.round_number + match_model["current_player"] = match.current_player + match_model["player_tables"] = [self.get_player(x) for x in match.player_tables] + return match_model + + def set_match(self, match_model): + match = Match() + match.name = match_model["name"] + match.version = match_model["version"] + match.round_number = match_model["round_number"] + match.current_player = match_model["current_player"] + match.player_tables = [self.set_player(x) for x in match_model["player_tables"]] + return match + + def get_player(self, player): + player_model = self.get_prototype("player") + player_model["name"] = player.name + player_model["version"] = player.version + player_model["player_idx"] = player.player_idx + player_model["active_character_idx"] = player.active_character_idx + player_model["has_round_ended"] = player.has_round_ended + player_model["arcane_legend"] = player.arcane_legend + player_model["plunge_satisfied"] = player.plunge_satisfied + player_model["dice"] = self.get_dice(player.dice) + player_model["characters"] = [self.get_character(x) for x in player.characters] + player_model["team_status"] = [self.get_team_status(x) for x in player.team_status] + player_model["summons"] = [self.get_summon(x) for x in player.summons] + player_model["supports"] = [self.get_support(x) for x in player.supports] + player_model["hands"] = self.get_hands(player.hands) + player_model["table_deck"] = self.get_deck(player.table_deck) + return player_model + + def set_player(self, player_model): + player = PlayerTable( + version=player_model["version"], + player_idx=player_model["player_idx"], + ) + player.dice = self.set_dice(player_model["dice"]) + player.characters = [self.set_character(x) for x in player_model["characters"]] + player.team_status = [self.set_character_status(x) for x in player_model["team_status"]] + player.summons = [self.set_character_status(x) for x in player_model["summons"]] + player.supports = [self.set_character_status(x) for x in player_model["supports"]] + player.hands = self.set_hands(player_model["hands"]) + player.table_deck = self.set_deck(player_model["table_deck"]) + return player + + def get_character(self, character): + character_model = self.get_prototype("character") + character_model["name"] = character.name + character_model["version"] = character.version + character_model["element"] = character.element + character_model["hp"] = character.hp + character_model["charge"] = character.charge + character_model["attaches"] = [self.get_attach(x) for x in character.attaches] + character_model["element_application"] = character.element_application + character_model["is_alive"] = character.is_alive + return character_model + + def set_character(self, character_model): + args = {"name": character_model["name"], "version": character_model["version"]} + character = get_instance(CharacterBase, args) + character.attaches = [self.set_attach(x) for x in character_model["attaches"]] + return character + + def get_attach(self, attach): + if attach.type == ObjectType.CHARACTER_STATUS: + attach_model = self.get_character_status(attach) + attach_model["type"] = "character_status" + return attach_model + else: + attach_model = self.get_card(attach) + attach_model["type"] = "equipment" + return attach_model + + def set_attach(self, attach_model): + if attach_model["type"] == "character_status": + return self.set_character_status(attach_model) + elif attach_model["type"] == "equipment": + return self.set_card(attach_model) + + def get_card(self, card): + if card is None: + return None + card_model = self.get_prototype("card") + card_model["name"] = card.name + card_model["version"] = card.version + return card_model + + def set_card(self, card_model): + if card_model is None: + return None + return get_instance(CardBase, card_model) + + def get_character_status(self, status): + character_status_model = self.get_prototype("character_status") + character_status_model["name"] = status.name + character_status_model["version"] = status.version + character_status_model["usage"] = status.usage + character_status_model["max_usage"] = status.max_usage + return character_status_model + + def set_character_status(self, status_model): + return get_instance(CharacterStatusBase, status_model) + + def get_deck(self, deck): + return [self.get_card(x) for x in deck] + + def set_deck(self, deck_model): + return [get_instance(CardBase, x) for x in deck_model] + + def get_dice(self, dice): + dice_model = self.get_prototype("dice") + dice_model["colors"] = dice.colors + return dice_model + + def set_dice(self, dice_model): + dice = Dice() + dice.colors = dice_model["colors"] + return dice + + def get_hands(self, hands): + return [self.get_card(x) for x in hands] + + def set_hands(self, hands_model): + return [get_instance(CardBase, x) for x in hands_model] + + def get_summon(self, summon): + summon_model = self.get_prototype("summon") + summon_model["name"] = summon.name + summon_model["version"] = summon.version + summon_model["usage"] = summon.usage + summon_model["max_usage"] = summon.max_usage + return summon_model + + def set_summon(self, summon_model): + return get_instance(SummonBase, summon_model) + + def get_support(self, support): + support_model = self.get_prototype("support") + support_model["name"] = support.name + support_model["version"] = support.version + support_model["usage"] = support.usage + return support_model + + def set_support(self, support_model): + return get_instance(SupportBase, support_model) + + def get_team_status(self, status): + team_status_model = self.get_prototype("team_status") + team_status_model["name"] = status.name + team_status_model["version"] = status.version + team_status_model["usage"] = status.usage + team_status_model["max_usage"] = status.max_usage + return team_status_model + + def set_team_status(self, status_model): + return get_instance(TeamStatusBase, status_model) + + +if __name__ == "__main__": + from src.lpsim.server.deck import Deck + from src.lpsim.agents import RandomAgent + agent_0 = RandomAgent(player_idx=0) + agent_1 = RandomAgent(player_idx=1) + deck = Deck.from_str( + ''' + default_version:4.1 + character:Rhodeia of Loch + character:Kamisato Ayaka + character:Yaoyao + Traveler's Handy Sword*5 + Gambler's Earrings*5 + Kanten Senmyou Blessing*5 + Sweet Madame*5 + Abyssal Summons*5 + Fatui Conspiracy*5 + Timmie*5 + ''' + ) + match = Match() + match.set_deck([deck, deck]) + match.config.max_same_card_number = 30 + match.config.history_level = 10 + match.config.check_deck_restriction = False + match.config.initial_hand_size = 20 + match.config.max_hand_size = 30 + match.config.card_number = None + assert match.start()[0] + match.step() + + while match.round_number < 3 \ + and not match.is_game_end(): + if match.need_respond(0): + current_agent = agent_0 + elif match.need_respond(1): + current_agent = agent_1 + else: + raise RuntimeError("no agent need to respond") + resp = current_agent.generate_response(match) + assert resp is not None + match.respond(resp) + match.step() + + Model = model() + Model.match_to_json(match, "match_1") + match = Model.json_to_match("match_1") + Model.match_to_json(match, "match_2") + assert Model.get_prototype("match_1") == Model.get_prototype("match_2") diff --git a/src/lpsim/model/player.json b/src/lpsim/model/player.json new file mode 100644 index 00000000..56217320 --- /dev/null +++ b/src/lpsim/model/player.json @@ -0,0 +1,16 @@ +{ + "name": "", + "version": "", + "player_idx": -1, + "active_charactor_idx": -1, + "has_round_ended": false, + "dice": [], + "team_status": [], + "characters": [], + "summons": [], + "supports": [], + "hands": [], + "table_deck": [], + "arcane_legend": true, + "plunge_satisfied": false +} diff --git a/src/lpsim/model/summon.json b/src/lpsim/model/summon.json new file mode 100644 index 00000000..3806e77c --- /dev/null +++ b/src/lpsim/model/summon.json @@ -0,0 +1,6 @@ +{ + "name": "", + "version": "", + "usage": -1, + "max_usage": -1 +} diff --git a/src/lpsim/model/support.json b/src/lpsim/model/support.json new file mode 100644 index 00000000..6a87c34d --- /dev/null +++ b/src/lpsim/model/support.json @@ -0,0 +1,5 @@ +{ + "name": "", + "version": "", + "usage": -1 +} diff --git a/src/lpsim/model/team_status.json b/src/lpsim/model/team_status.json new file mode 100644 index 00000000..3806e77c --- /dev/null +++ b/src/lpsim/model/team_status.json @@ -0,0 +1,6 @@ +{ + "name": "", + "version": "", + "usage": -1, + "max_usage": -1 +} diff --git a/src/lpsim/server/event_frame.py b/src/lpsim/server/event_frame.py new file mode 100644 index 00000000..8f8b044f --- /dev/null +++ b/src/lpsim/server/event_frame.py @@ -0,0 +1,74 @@ +from typing import Optional, Union + +from .consts import ObjectPositionType +from .event import * + + +class EventFrameController(BaseModel): + frame_list: List[EventFrame] = [] + + def has_event(self): + return len(self.frame_list) != 0 + + def append(self, event_frame): + self.frame_list.append(event_frame) + + def pop(self): + return self.frame_list.pop() + + def run_event_frame(self, match): + while len(self.frame_list): + event_frame = self.frame_list[-1] + if len(event_frame.triggered_actions): + self.act_action(event_frame, match) + return + elif len(event_frame.triggered_objects): + self.get_action(event_frame, match) + elif len(event_frame.events): + self.trigger_event(event_frame, match) + else: + self.frame_list.pop() + # event frame cleared, clear trashbin + match.trashbin.clear() + + def act_action(self, frame, match): + activated_action = frame.triggered_actions.pop(0) + event_args = match._act(activated_action) + new_frame = EventFrame( + events=event_args, + ) + self.frame_list.append(new_frame) + + def get_action(self, frame, match): + event_arg = frame.processing_event + assert event_arg is not None + object_position = frame.triggered_objects.pop(0) + obj = match.get_object(object_position, event_arg.type) + handler_name = f"event_handler_{event_arg.type.name}" + func = getattr(obj, handler_name, None) + if func is not None: + frame.triggered_actions = func(event_arg, match) + self.frame_list[-1] = frame + + def trigger_event(self, event_frame: EventFrame, match) -> None: + """ + trigger new event to update triggered object lists of a EventFrame. + it will take first event from events, put it into processing_event, + and update triggered object lists. + """ + event_arg = event_frame.events.pop(0) + event_frame.processing_event = event_arg + object_list = match.get_object_list() + # add object in trashbin to list + for obj in match.trashbin: + if event_arg.type in obj.available_handler_in_trashbin: + object_list.append(obj) + handler_name = f"event_handler_{event_arg.type.name}" + for obj in object_list: + # for deck objects, check availability + if obj.position.area == ObjectPositionType.DECK: + if event_arg.type not in obj.available_handler_in_deck: + continue + func = getattr(obj, handler_name, None) + if func is not None: + event_frame.triggered_objects.append(obj.position) \ No newline at end of file diff --git a/src/lpsim/server/match.py b/src/lpsim/server/match.py index 5a854e7f..d8d2468c 100644 --- a/src/lpsim/server/match.py +++ b/src/lpsim/server/match.py @@ -7,6 +7,7 @@ from pydantic import PrivateAttr, validator import dictdiffer +from .event_frame import EventFrameController from .summon.base import SummonBase from .status.team_status.base import TeamStatusBase @@ -350,7 +351,8 @@ class Match(BaseModel): current_player: int = -1 player_tables: List[PlayerTable] = [] state: MatchState = MatchState.WAITING - event_frames: List[EventFrame] = [] + event_frames = EventFrameController() + # event_frames: List[EventFrame] = [] requests: List[Requests] = [] winner: int = -1 @@ -709,29 +711,9 @@ def start(self) -> Tuple[bool, Any]: ) ) event_frame = self._stack_events(event_args) - self.empty_frame_assertion( - event_frame=event_frame, - error_message="Initial draw card should not trigger objects.", - ) + self.event_frames.append(event_frame) return True, None - def empty_frame_assertion( - self, event_frame: EventFrame, error_message: str - ) -> None: - while len(event_frame.events): - self._trigger_event(event_frame) - while len(event_frame.triggered_objects): - position = event_frame.triggered_objects.pop(0) - object = self.get_object(position) - event_arg = event_frame.processing_event - assert event_arg is not None - handler_name = "event_handler_" + event_arg.type.value - func = getattr(object, handler_name) - actions = func(event_frame.processing_event, self) - if len(actions): - self._set_match_state(MatchState.ERROR) # pragma no cover - raise AssertionError(error_message) - def step(self, run_continuously: bool = True) -> bool: """ Simulate one step of the match, or run continuously until a response @@ -760,8 +742,8 @@ def step(self, run_continuously: bool = True) -> bool: logging.info("There are still requests not responded.") return False # check if action is needed - elif len(self.event_frames) != 0: - self._next_action() + elif self.event_frames.has_event(): + self.event_frames.run_event_frame(self) # all response and action are cleared, start state transition elif self.state == MatchState.STARTING: self._set_match_state(MatchState.STARTING_CARD_SWITCH) @@ -900,67 +882,6 @@ def is_game_end(self) -> bool: return True return False - def _next_action(self): - """ - Do one action in `self.event_frames`. If the last event frame has - triggered actions, it will do one action and stack new event frame. - If triggered actions is empty and has triggered objects, get - actions and do the first. If have unprocessed event arguments, - trigger objects. If none of all, pop last event frame. - Unless there are no actions, this function will exactly do one action. - """ - assert len(self.event_frames) > 0, "No event frame to process." - while len(self.event_frames): - event_frame = self.event_frames[-1] - if len(event_frame.triggered_actions): - # do one action - activated_action = event_frame.triggered_actions.pop(0) - logging.info(f"Action activated: {activated_action}") - event_args = self._act(activated_action) - self._stack_events(event_args) - return - elif len(event_frame.triggered_objects): - # get actions - event_arg = event_frame.processing_event - assert event_arg is not None - object_position = event_frame.triggered_objects.pop(0) - object = self.get_object(object_position, event_arg.type) - object_name = object.__class__.__name__ - if hasattr(object, "name"): - object_name = object.name # type: ignore - if object is None: - logging.warning( - f"Object {object_position} does not exist. " - "Is it be removed before triggering or a bug?" - ) - else: - handler_name = f"event_handler_{event_arg.type.name}" - func = getattr(object, handler_name, None) - assert func is not None, ( - f"Object {object_name} does not have handler for " - f"{event_arg.type.name}." - ) - event_frame.triggered_actions = func(event_arg, self) - if event_frame.triggered_actions is None: - raise AssertionError( - f"Object {object_name} with event " - f"{event_arg.type} returns None." - ) - if len(event_frame.triggered_actions) > 0: - logging.info( - f"Object {object_name} with event {event_arg.type}" - f", triggered " - f"{len(event_frame.triggered_actions)} actions " - ) - elif len(event_frame.events): - # trigger objects - self._trigger_event(event_frame) - else: - # pop event frame - self.event_frames.pop() - # event frame cleared, clear trashbin - self.trashbin.clear() - def _game_start(self): """ Game started. Will send game start event. @@ -1011,9 +932,7 @@ def _round_start(self): event_frame = EventFrame( events=event_args, ) - self.empty_frame_assertion( - event_frame, "Create dice in round start should not trigger actions." - ) + self.event_frames.append(event_frame) # collect actions triggered by round start # reroll dice chance. reroll times can be modified by objects. for pnum, player_table in enumerate(self.player_tables): @@ -1191,40 +1110,6 @@ def _stack_events( self.event_frames.append(frame) return frame - def _trigger_event( - self, - event_frame: EventFrame, - ) -> EventArguments: - """ - trigger new event to update triggered object lists of a EventFrame. - it will take first event from events, put it into processing_event, - and update triggered object lists. - """ - assert len(event_frame.triggered_objects) == 0 - assert len(event_frame.triggered_actions) == 0 - assert len(event_frame.events) != 0 - event_arg = event_frame.events.pop(0) - event_frame.processing_event = event_arg - object_list = self.get_object_list() - # add object in trashbin to list - for object in self.trashbin: - if event_arg.type in object.available_handler_in_trashbin: - object_list.append(object) - handler_name = f"event_handler_{event_arg.type.name}" - for obj in object_list: - # for deck objects, check availability - if obj.position.area == ObjectPositionType.DECK: - if event_arg.type not in obj.available_handler_in_deck: - continue - name = obj.__class__.__name__ - if hasattr(obj, "name"): # pragma: no cover - name = obj.name # type: ignore - func = getattr(obj, handler_name, None) - if func is not None: - logging.debug(f"Trigger event {event_arg.type.name} " f"for {name}.") - event_frame.triggered_objects.append(obj.position) - return event_arg - def _modify_value( self, value: ModifiableValueBase, @@ -1535,9 +1420,7 @@ def _respond_switch_card(self, response: SwitchCardResponse): ) ) event_frame = self._stack_events(event_args) - self.empty_frame_assertion( - event_frame, "Switch card should not trigger actions now." - ) + self.event_frames.append(event_frame) # remove related requests self.requests = [ req for req in self.requests if req.player_idx != response.player_idx @@ -1560,9 +1443,7 @@ def _respond_reroll_dice(self, response: RerollDiceResponse): """ event_args = self._action_remove_dice(RemoveDiceAction.from_response(response)) event_frame = self._stack_events(list(event_args)) - self.empty_frame_assertion( - event_frame, "Removing dice in Reroll Dice should not trigger actions." - ) + self.event_frames.append(event_frame) event_args = self._action_create_dice( CreateDiceAction( player_idx=response.player_idx, @@ -1571,9 +1452,7 @@ def _respond_reroll_dice(self, response: RerollDiceResponse): ) ) event_frame = self._stack_events(list(event_args)) - self.empty_frame_assertion( - event_frame, "Creating dice in Reroll Dice should not trigger actions." - ) + self.event_frames.append(event_frame) # modify request for num, req in enumerate(self.requests): # pragma: no branch if isinstance(req, RerollDiceRequest): # pragma: no branch