diff --git a/adf_core_python/core/agent/agent.py b/adf_core_python/core/agent/agent.py index a5bbee66..ec0933c3 100644 --- a/adf_core_python/core/agent/agent.py +++ b/adf_core_python/core/agent/agent.py @@ -1,5 +1,7 @@ import sys +import time as _time from abc import abstractmethod +from threading import Event from typing import Any, Callable, NoReturn from bitarray import bitarray @@ -91,6 +93,7 @@ def __init__( data_storage_name: str, module_config: ModuleConfig, develop_data: DevelopData, + finish_post_connect_event: Event, ) -> None: self.name = name self.connect_request_id = None @@ -102,6 +105,7 @@ def __init__( self.logger = get_logger( f"{self.__class__.__module__}.{self.__class__.__qualname__}" ) + self.finish_post_connect_event = finish_post_connect_event self.team_name = team_name self.is_debug = is_debug @@ -148,7 +152,7 @@ def post_connect(self) -> None: self._agent_info, ) - self.logger.info(f"config: {self.config}") + self.logger.debug(f"agent_config: {self.config}") def update_step_info( self, time: int, change_set: ChangeSet, hear: list[Command] @@ -293,9 +297,12 @@ def handler_sense(self, msg: Any) -> None: ].intValue, ) ) - self.world_model.merge(change_set) + start_update_info_time = _time.time() self.update_step_info(time, change_set, heard_commands) + self.logger.debug( + f"{time} step calculation time: {_time.time() - start_update_info_time}" + ) def send_acknowledge(self, request_id: int) -> None: ak_ack = AKAcknowledge() diff --git a/adf_core_python/core/agent/office/office.py b/adf_core_python/core/agent/office/office.py index 4c43c4b2..0aed6bcf 100644 --- a/adf_core_python/core/agent/office/office.py +++ b/adf_core_python/core/agent/office/office.py @@ -1,3 +1,5 @@ +from threading import Event + from adf_core_python.core.agent.agent import Agent from adf_core_python.core.agent.config.module_config import ModuleConfig from adf_core_python.core.agent.develop.develop_data import DevelopData @@ -18,6 +20,7 @@ def __init__( data_storage_name: str, module_config: ModuleConfig, develop_data: DevelopData, + finish_post_connect_event: Event, ) -> None: super().__init__( is_precompute, @@ -27,6 +30,7 @@ def __init__( data_storage_name, module_config, develop_data, + finish_post_connect_event, ) self._tactics_center = tactics_center self._team_name = team_name @@ -90,6 +94,7 @@ def post_connect(self) -> None: self.precompute_data, self._develop_data, ) + self.finish_post_connect_event.set() def think(self) -> None: self._tactics_center.think( diff --git a/adf_core_python/core/agent/office/office_ambulance.py b/adf_core_python/core/agent/office/office_ambulance.py index 4076f67a..6bec5308 100644 --- a/adf_core_python/core/agent/office/office_ambulance.py +++ b/adf_core_python/core/agent/office/office_ambulance.py @@ -1,3 +1,5 @@ +from threading import Event + from rcrs_core.connection.URN import Entity as EntityURN from adf_core_python.core.agent.config.module_config import ModuleConfig @@ -16,6 +18,7 @@ def __init__( data_storage_name: str, module_config: ModuleConfig, develop_data: DevelopData, + finish_post_connect_event: Event, ) -> None: super().__init__( tactics_center, @@ -25,6 +28,7 @@ def __init__( data_storage_name, module_config, develop_data, + finish_post_connect_event, ) def precompute(self) -> None: diff --git a/adf_core_python/core/agent/office/office_fire.py b/adf_core_python/core/agent/office/office_fire.py index bcd19460..dcab2bdf 100644 --- a/adf_core_python/core/agent/office/office_fire.py +++ b/adf_core_python/core/agent/office/office_fire.py @@ -1,3 +1,5 @@ +from threading import Event + from rcrs_core.connection.URN import Entity as EntityURN from adf_core_python.core.agent.config.module_config import ModuleConfig @@ -16,6 +18,7 @@ def __init__( data_storage_name: str, module_config: ModuleConfig, develop_data: DevelopData, + finish_post_connect_event: Event, ) -> None: super().__init__( tactics_center, @@ -25,6 +28,7 @@ def __init__( data_storage_name, module_config, develop_data, + finish_post_connect_event, ) def precompute(self) -> None: diff --git a/adf_core_python/core/agent/office/office_police.py b/adf_core_python/core/agent/office/office_police.py index ccfcbca9..b4c9fc4d 100644 --- a/adf_core_python/core/agent/office/office_police.py +++ b/adf_core_python/core/agent/office/office_police.py @@ -1,3 +1,5 @@ +from threading import Event + from rcrs_core.connection.URN import Entity as EntityURN from adf_core_python.core.agent.config.module_config import ModuleConfig @@ -16,6 +18,7 @@ def __init__( data_storage_name: str, module_config: ModuleConfig, develop_data: DevelopData, + finish_post_connect_event: Event, ) -> None: super().__init__( tactics_center, @@ -25,6 +28,7 @@ def __init__( data_storage_name, module_config, develop_data, + finish_post_connect_event, ) def precompute(self) -> None: diff --git a/adf_core_python/core/agent/platoon/platoon.py b/adf_core_python/core/agent/platoon/platoon.py index 01babaaa..36c46a6e 100644 --- a/adf_core_python/core/agent/platoon/platoon.py +++ b/adf_core_python/core/agent/platoon/platoon.py @@ -1,3 +1,5 @@ +from threading import Event + from adf_core_python.core.agent.action.action import Action from adf_core_python.core.agent.agent import Agent from adf_core_python.core.agent.config.module_config import ModuleConfig @@ -19,6 +21,7 @@ def __init__( data_storage_name: str, module_config: ModuleConfig, develop_data: DevelopData, + finish_post_connect_event: Event, ) -> None: super().__init__( is_precompute, @@ -28,6 +31,7 @@ def __init__( data_storage_name, module_config, develop_data, + finish_post_connect_event, ) self._tactics_agent = tactics_agent self._team_name = team_name @@ -92,6 +96,8 @@ def post_connect(self) -> None: self._develop_data, ) + self.finish_post_connect_event.set() + def think(self) -> None: action: Action = self._tactics_agent.think( self._agent_info, diff --git a/adf_core_python/core/agent/platoon/platoon_ambulance.py b/adf_core_python/core/agent/platoon/platoon_ambulance.py index ca8e1694..8a4d9762 100644 --- a/adf_core_python/core/agent/platoon/platoon_ambulance.py +++ b/adf_core_python/core/agent/platoon/platoon_ambulance.py @@ -1,3 +1,5 @@ +from threading import Event + from rcrs_core.connection.URN import Entity as EntityURN from adf_core_python.core.agent.config.module_config import ModuleConfig @@ -16,6 +18,7 @@ def __init__( data_storage_name: str, module_config: ModuleConfig, develop_data: DevelopData, + finish_post_connect_event: Event, ): super().__init__( tactics_agent, @@ -25,6 +28,7 @@ def __init__( data_storage_name, module_config, develop_data, + finish_post_connect_event, ) def precompute(self) -> None: diff --git a/adf_core_python/core/agent/platoon/platoon_fire.py b/adf_core_python/core/agent/platoon/platoon_fire.py index 19357953..01d75bac 100644 --- a/adf_core_python/core/agent/platoon/platoon_fire.py +++ b/adf_core_python/core/agent/platoon/platoon_fire.py @@ -1,3 +1,5 @@ +from threading import Event + from rcrs_core.connection.URN import Entity as EntityURN from adf_core_python.core.agent.config.module_config import ModuleConfig @@ -16,6 +18,7 @@ def __init__( data_storage_name: str, module_config: ModuleConfig, develop_data: DevelopData, + finish_post_connect_event: Event, ): super().__init__( tactics_agent, @@ -25,6 +28,7 @@ def __init__( data_storage_name, module_config, develop_data, + finish_post_connect_event, ) def precompute(self) -> None: diff --git a/adf_core_python/core/agent/platoon/platoon_police.py b/adf_core_python/core/agent/platoon/platoon_police.py index 129c6635..569159e9 100644 --- a/adf_core_python/core/agent/platoon/platoon_police.py +++ b/adf_core_python/core/agent/platoon/platoon_police.py @@ -1,3 +1,5 @@ +from threading import Event + from rcrs_core.connection.URN import Entity as EntityURN from adf_core_python.core.agent.config.module_config import ModuleConfig @@ -16,6 +18,7 @@ def __init__( data_storage_name: str, module_config: ModuleConfig, develop_data: DevelopData, + finish_post_connect_event: Event, ): super().__init__( tactics_agent, @@ -25,6 +28,7 @@ def __init__( data_storage_name, module_config, develop_data, + finish_post_connect_event, ) def precompute(self) -> None: diff --git a/adf_core_python/core/component/module/abstract_module.py b/adf_core_python/core/component/module/abstract_module.py index 76605194..4a3f3178 100644 --- a/adf_core_python/core/component/module/abstract_module.py +++ b/adf_core_python/core/component/module/abstract_module.py @@ -1,8 +1,11 @@ from __future__ import annotations +import time from abc import ABC, abstractmethod from typing import TYPE_CHECKING +from adf_core_python.core.logger.logger import get_agent_logger + if TYPE_CHECKING: from adf_core_python.core.agent.communication.message_manager import MessageManager from adf_core_python.core.agent.develop.develop_data import DevelopData @@ -32,6 +35,10 @@ def __init__( self._count_prepare: int = 0 self._count_update_info: int = 0 self._count_update_info_current_time: int = 0 + self._logger = get_agent_logger( + f"{self.__class__.__module__}.{self.__class__.__qualname__}", + self._agent_info, + ) self._sub_modules: list[AbstractModule] = [] @@ -56,7 +63,11 @@ def resume(self, precompute_data: PrecomputeData) -> AbstractModule: def prepare(self) -> AbstractModule: self._count_prepare += 1 for sub_module in self._sub_modules: + start_time = time.time() sub_module.prepare() + self._logger.debug( + f"{self.__class__.__name__}'s sub_module {sub_module.__class__.__name__} prepare time: {time.time() - start_time:.3f}", + ) return self def update_info(self, message_manager: MessageManager) -> AbstractModule: diff --git a/adf_core_python/core/component/tactics/tactics_agent.py b/adf_core_python/core/component/tactics/tactics_agent.py index b5134418..b0925175 100644 --- a/adf_core_python/core/component/tactics/tactics_agent.py +++ b/adf_core_python/core/component/tactics/tactics_agent.py @@ -1,5 +1,6 @@ from __future__ import annotations +import time from abc import ABC, abstractmethod from typing import TYPE_CHECKING, Any, Optional @@ -130,9 +131,17 @@ def module_resume(self, precompute_data: PrecomputeData) -> None: def module_prepare(self) -> None: for module in self._modules: + start_time = time.time() module.prepare() + self._logger.debug( + f"module {module.__class__.__name__} prepare time: {time.time() - start_time:.3f}", + ) for action in self._actions: + start_time = time.time() action.prepare() + self._logger.debug( + f"module {action.__class__.__name__} prepare time: {time.time() - start_time:.3f}", + ) # for executor in self._command_executor: # executor.prepare() diff --git a/adf_core_python/core/launcher/agent_launcher.py b/adf_core_python/core/launcher/agent_launcher.py index 9b275f65..f9efa15d 100644 --- a/adf_core_python/core/launcher/agent_launcher.py +++ b/adf_core_python/core/launcher/agent_launcher.py @@ -32,7 +32,7 @@ def __init__(self, config: Config): self.config = config self.logger = get_logger(__name__) self.connectors: list[Connector] = [] - self.thread_list: list[threading.Thread] = [] + self.agent_thread_list: list[threading.Thread] = [] def init_connector(self) -> None: loader_name, loader_class_name = self.config.get_value( @@ -63,12 +63,27 @@ def launch(self) -> None: host, port, self.logger ) + connector_thread_list: list[threading.Thread] = [] for connector in self.connectors: threads = connector.connect(component_launcher, self.config, self.loader) - for thread in threads: - thread.daemon = True - thread.start() - self.thread_list.extend(threads) + self.agent_thread_list.extend(threads) - for thread in self.thread_list: + def connect() -> None: + for thread, event in threads.items(): + thread.daemon = True + thread.start() + is_not_timeout = event.wait(5) + if not is_not_timeout: + break + + connector_thread = threading.Thread(target=connect) + connector_thread_list.append(connector_thread) + connector_thread.start() + + for thread in connector_thread_list: + thread.join() + + self.logger.info("All agents have been launched") + + for thread in self.agent_thread_list: thread.join() diff --git a/adf_core_python/core/launcher/config_key.py b/adf_core_python/core/launcher/config_key.py index 69d7d354..d32afb2e 100644 --- a/adf_core_python/core/launcher/config_key.py +++ b/adf_core_python/core/launcher/config_key.py @@ -22,3 +22,4 @@ class ConfigKey: KEY_AMBULANCE_CENTRE_COUNT: Final[str] = "adf.team.office.ambulance.count" KEY_FIRE_STATION_COUNT: Final[str] = "adf.team.office.fire.count" KEY_POLICE_OFFICE_COUNT: Final[str] = "adf.team.office.police.count" + # adf-core-python diff --git a/adf_core_python/core/launcher/connect/component_launcher.py b/adf_core_python/core/launcher/connect/component_launcher.py index d4f6ffd5..8e45009f 100644 --- a/adf_core_python/core/launcher/connect/component_launcher.py +++ b/adf_core_python/core/launcher/connect/component_launcher.py @@ -17,19 +17,12 @@ def make_connection(self) -> Connection: return Connection(self.host, self.port) def connect(self, agent: Agent, _request_id: int) -> None: - # self.logger.bind(agent_id=agent.get_id()) - self.logger.info( - f"{agent.__class__.__name__} connecting to {self.host}:{self.port} request_id: {_request_id}" + f"{agent.__class__.__name__} trying to connect to {self.host}:{self.port} request_id: {_request_id}" ) connection = self.make_connection() try: connection.connect() - # ソケットが使用しているPORT番号を取得 - if connection.socket is not None: - self.logger.info( - f"Connected to {self.host}:{self.port} on port {connection.socket.getsockname()[1]}" - ) except socket.timeout: self.logger.warning(f"Connection to {self.host}:{self.port} timed out") return diff --git a/adf_core_python/core/launcher/connect/connector.py b/adf_core_python/core/launcher/connect/connector.py index 850023af..e517a843 100644 --- a/adf_core_python/core/launcher/connect/connector.py +++ b/adf_core_python/core/launcher/connect/connector.py @@ -16,7 +16,7 @@ def connect( component_launcher: ComponentLauncher, config: Config, loader: AbstractLoader, - ) -> list[threading.Thread]: + ) -> dict[threading.Thread, threading.Event]: raise NotImplementedError def get_connected_agent_count(self) -> int: diff --git a/adf_core_python/core/launcher/connect/connector_ambulance_center.py b/adf_core_python/core/launcher/connect/connector_ambulance_center.py index 98e02d8a..1dca5afc 100644 --- a/adf_core_python/core/launcher/connect/connector_ambulance_center.py +++ b/adf_core_python/core/launcher/connect/connector_ambulance_center.py @@ -24,12 +24,12 @@ def connect( component_launcher: ComponentLauncher, config: Config, loader: AbstractLoader, - ) -> list[threading.Thread]: + ) -> dict[threading.Thread, threading.Event]: count: int = config.get_value(ConfigKey.KEY_AMBULANCE_CENTRE_COUNT, 0) if count == 0: - return [] + return {} - threads: list[threading.Thread] = [] + threads: dict[threading.Thread, threading.Event] = {} for _ in range(count): if loader.get_tactics_ambulance_center() is None: @@ -54,6 +54,7 @@ def connect( ) request_id: int = component_launcher.generate_request_id() + finish_post_connect_event = threading.Event() thread = threading.Thread( target=component_launcher.connect, args=( @@ -65,12 +66,13 @@ def connect( "test", module_config, develop_data, + finish_post_connect_event, ), request_id, ), name=f"AmbulanceCenterAgent-{request_id}", ) - threads.append(thread) + threads[thread] = finish_post_connect_event self.logger.info("Connected ambulance center (count: %d)" % count) return threads diff --git a/adf_core_python/core/launcher/connect/connector_ambulance_team.py b/adf_core_python/core/launcher/connect/connector_ambulance_team.py index 350d6bd8..879f313d 100644 --- a/adf_core_python/core/launcher/connect/connector_ambulance_team.py +++ b/adf_core_python/core/launcher/connect/connector_ambulance_team.py @@ -24,12 +24,12 @@ def connect( component_launcher: ComponentLauncher, config: Config, loader: AbstractLoader, - ) -> list[threading.Thread]: + ) -> dict[threading.Thread, threading.Event]: count: int = config.get_value(ConfigKey.KEY_AMBULANCE_TEAM_COUNT, 0) if count == 0: - return [] + return {} - threads: list[threading.Thread] = [] + threads: dict[threading.Thread, threading.Event] = {} for _ in range(count): if loader.get_tactics_ambulance_team() is None: @@ -54,6 +54,7 @@ def connect( ) request_id: int = component_launcher.generate_request_id() + finish_post_connect_event = threading.Event() thread = threading.Thread( target=component_launcher.connect, args=( @@ -65,12 +66,13 @@ def connect( "test", module_config, develop_data, + finish_post_connect_event, ), request_id, ), name=f"AmbulanceTeam-{request_id}", ) - threads.append(thread) + threads[thread] = finish_post_connect_event self.logger.info("Connected ambulance team (count: %d)" % count) return threads diff --git a/adf_core_python/core/launcher/connect/connector_fire_brigade.py b/adf_core_python/core/launcher/connect/connector_fire_brigade.py index 9fc24aa7..ac7ecb37 100644 --- a/adf_core_python/core/launcher/connect/connector_fire_brigade.py +++ b/adf_core_python/core/launcher/connect/connector_fire_brigade.py @@ -24,12 +24,12 @@ def connect( component_launcher: ComponentLauncher, config: Config, loader: AbstractLoader, - ) -> list[threading.Thread]: + ) -> dict[threading.Thread, threading.Event]: count: int = config.get_value(ConfigKey.KEY_FIRE_BRIGADE_COUNT, 0) if count == 0: - return [] + return {} - threads: list[threading.Thread] = [] + threads: dict[threading.Thread, threading.Event] = {} for _ in range(count): if loader.get_tactics_fire_brigade() is None: @@ -52,6 +52,7 @@ def connect( ) request_id: int = component_launcher.generate_request_id() + finish_post_connect_event = threading.Event() thread = threading.Thread( target=component_launcher.connect, args=( @@ -63,12 +64,13 @@ def connect( "test", module_config, develop_data, + finish_post_connect_event, ), request_id, ), name=f"FireBrigadeAgent-{request_id}", ) - threads.append(thread) + threads[thread] = finish_post_connect_event self.logger.info("Connected fire brigade (count: %d)" % count) return threads diff --git a/adf_core_python/core/launcher/connect/connector_fire_station.py b/adf_core_python/core/launcher/connect/connector_fire_station.py index f9f4abc0..de8dbc6b 100644 --- a/adf_core_python/core/launcher/connect/connector_fire_station.py +++ b/adf_core_python/core/launcher/connect/connector_fire_station.py @@ -24,12 +24,12 @@ def connect( component_launcher: ComponentLauncher, config: Config, loader: AbstractLoader, - ) -> list[threading.Thread]: + ) -> dict[threading.Thread, threading.Event]: count: int = config.get_value(ConfigKey.KEY_FIRE_STATION_COUNT, 0) if count == 0: - return [] + return {} - threads: list[threading.Thread] = [] + threads: dict[threading.Thread, threading.Event] = {} for _ in range(count): if loader.get_tactics_fire_station() is None: @@ -52,6 +52,7 @@ def connect( ) request_id: int = component_launcher.generate_request_id() + finish_post_connect_event = threading.Event() thread = threading.Thread( target=component_launcher.connect, args=( @@ -63,12 +64,13 @@ def connect( "test", module_config, develop_data, + finish_post_connect_event, ), request_id, ), name=f"FireStationAgent-{request_id}", ) - threads.append(thread) + threads[thread] = finish_post_connect_event self.logger.info("Connected fire station (count: %d)" % count) return threads diff --git a/adf_core_python/core/launcher/connect/connector_police_force.py b/adf_core_python/core/launcher/connect/connector_police_force.py index 2b3a905d..d2f45b12 100644 --- a/adf_core_python/core/launcher/connect/connector_police_force.py +++ b/adf_core_python/core/launcher/connect/connector_police_force.py @@ -24,12 +24,12 @@ def connect( component_launcher: ComponentLauncher, config: Config, loader: AbstractLoader, - ) -> list[threading.Thread]: + ) -> dict[threading.Thread, threading.Event]: count: int = config.get_value(ConfigKey.KEY_POLICE_FORCE_COUNT, 0) if count == 0: - return [] + return {} - threads: list[threading.Thread] = [] + threads: dict[threading.Thread, threading.Event] = {} for _ in range(count): if loader.get_tactics_police_force() is None: @@ -52,6 +52,7 @@ def connect( ) request_id: int = component_launcher.generate_request_id() + finish_post_connect_event = threading.Event() thread = threading.Thread( target=component_launcher.connect, args=( @@ -63,12 +64,13 @@ def connect( "test", module_config, develop_data, + finish_post_connect_event, ), request_id, ), name=f"PoliceForceAgent-{request_id}", ) - threads.append(thread) + threads[thread] = finish_post_connect_event self.logger.info("Connected police force (count: %d)" % count) return threads diff --git a/adf_core_python/core/launcher/connect/connector_police_office.py b/adf_core_python/core/launcher/connect/connector_police_office.py index 01601fc8..1c7a7f77 100644 --- a/adf_core_python/core/launcher/connect/connector_police_office.py +++ b/adf_core_python/core/launcher/connect/connector_police_office.py @@ -24,12 +24,12 @@ def connect( component_launcher: ComponentLauncher, config: Config, loader: AbstractLoader, - ) -> list[threading.Thread]: + ) -> dict[threading.Thread, threading.Event]: count: int = config.get_value(ConfigKey.KEY_POLICE_OFFICE_COUNT, 0) if count == 0: - return [] + return {} - threads: list[threading.Thread] = [] + threads: dict[threading.Thread, threading.Event] = {} for _ in range(count): if loader.get_tactics_police_office() is None: @@ -54,6 +54,7 @@ def connect( ) request_id: int = component_launcher.generate_request_id() + finish_post_connect_event = threading.Event() thread = threading.Thread( target=component_launcher.connect, args=( @@ -65,12 +66,13 @@ def connect( "test", module_config, develop_data, + finish_post_connect_event, ), request_id, ), name=f"PoliceOfficeAgent-{request_id}", ) - threads.append(thread) + threads[thread] = finish_post_connect_event self.logger.info("Connected police office (count: %d)" % count) return threads diff --git a/adf_core_python/launcher.py b/adf_core_python/launcher.py index e523a1f3..310ff9d7 100644 --- a/adf_core_python/launcher.py +++ b/adf_core_python/launcher.py @@ -98,7 +98,7 @@ def __init__( if value is not None: self.launcher_config.set_value(key, value) - self.logger.info(f"Config: {self.launcher_config}") + self.logger.debug(f"launcher_config: {self.launcher_config}") def launch(self) -> None: agent_launcher: AgentLauncher = AgentLauncher(