Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Alt-Core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ dependencies = [
"numpy == 2.0.2",
"opencv-python == 4.11.0.86",
"typing-extensions>=4.0.0",
"types-requests == 2.32.4.20250611",
"mypy == 1.16.1",
]

[tool.setuptools]
Expand All @@ -38,4 +40,4 @@ namespaces = true
[project.urls]
"Homepage" = "https://github.com/Team488/Alt"
"Bug Tracker" = "https://github.com/Team488/Alt/issues"
"repository" = "https://github.com/Team488/Alt"
"repository" = "https://github.com/Team488/Alt"
168 changes: 132 additions & 36 deletions Alt-Core/src/Alt/Core/Agents/Agent.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,120 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from logging import Logger
from typing import Dict, Optional, Any, ClassVar
from typing import Any, Dict, Optional, Protocol, TypeVar, TYPE_CHECKING

from JXTABLES.XTablesClient import XTablesClient

from ..Utils.network import DEVICEIP
from ..Operators.LogStreamOperator import LogStreamOperator
from ..Operators.UpdateOperator import UpdateOperator
from ..Operators.PropertyOperator import PropertyOperator
from ..Operators.ConfigOperator import ConfigOperator
from ..Operators.ShareOperator import ShareOperator
from ..Operators.StreamOperator import StreamProxy
from ..Operators.TimeOperator import TimeOperator, Timer
from ..Constants.Teams import TEAM
from ..Constants.AgentConstants import Proxy, ProxyType

if TYPE_CHECKING:
from ..Constants.AgentConstants import Proxy, ProxyType
from ..Constants.Teams import TEAM
from ..Operators.ConfigOperator import ConfigOperator
from ..Operators.LogStreamOperator import LogStreamOperator
from ..Operators.PropertyOperator import PropertyOperator
from ..Operators.ShareOperator import ShareOperator
from ..Operators.StreamOperator import StreamProxy
from ..Operators.TimeOperator import TimeOperator, Timer
from ..Operators.UpdateOperator import UpdateOperator


class Agent(Protocol):
hasShutdown: bool = False
hasClosed: bool = False
isCleanedUp: bool = False
isMainThread: bool = False
agentName: str = ""
xclient: XTablesClient
propertyOperator: PropertyOperator
configOperator: ConfigOperator
updateOp: UpdateOperator
timeOp: TimeOperator
Sentinel: Logger
timer: Timer

def _injectCore(
self, shareOperator: ShareOperator, isMainThread: bool, agentName: str
) -> None:
...

def _injectNEW(
self,
xclient: XTablesClient, # new
propertyOperator: PropertyOperator, # new
configOperator: ConfigOperator, # new
updateOperator: UpdateOperator, # new
timeOperator: TimeOperator, # new
logger: Logger, # static/new
) -> None:
...

def getProxy(self, proxyName: str) -> Optional[Proxy]:
...

def _setProxies(self, proxies) -> None:
...

def _cleanup(self) -> None:
...

def getTimer(self) -> Timer:
...

def getTeam(self) -> Optional[TEAM]:
...

def _runOwnCreate(self):
...

@classmethod
def getName(cls) -> str:
...

def create(self) -> None:
...

def runPeriodic(self) -> None:
...

class Agent(ABC):
def isRunning(self) -> bool:
...

def getDescription(self) -> str:
...

def getIntervalMs(self) -> int:
...

def forceShutdown(self) -> None:
...

def onClose(self) -> None:
...

@classmethod
def requestProxies(cls) -> None:
...

@classmethod
def addProxyRequest(cls, proxyName: str, proxyType: ProxyType) -> None:
...

@classmethod
def _getProxyRequests(cls) -> Dict[str, ProxyType]:
...


TAgent = TypeVar("TAgent", bound=Agent)


class AgentBase(ABC):
"""
Base class for all agents.
"""

_proxyRequests: Dict[str, ProxyType] = {}
DEFAULT_LOOP_TIME: int = 0 # 0 ms
TIMERS = "timers"

Expand All @@ -31,8 +125,7 @@ def __init__(self, **kwargs: Any) -> None:
self.isCleanedUp: bool = False
self.isMainThread: bool = False
self.agentName = ""
self.__proxies : Dict[str, Proxy] = {}

self.__proxies: Dict[str, Proxy] = {}

def _injectCore(
self, shareOperator: ShareOperator, isMainThread: bool, agentName: str
Expand Down Expand Up @@ -70,23 +163,24 @@ def _injectNEW(
self.timer = self.timeOp.getTimer(self.TIMERS)
# other than setting variables, nothing should go here

def _setProxies(self, proxies):
def _setProxies(self, proxies) -> None:
self.__proxies = proxies

def _updateNetworkProxyInfo(self):
""" Put information about the proxies used on xtables. This can be used for the dashboard, and other things"""
"""Put information about the proxies used on xtables. This can be used for the dashboard, and other things"""
streamPaths = []
for proxyName, proxy in self.__proxies.items():
if isinstance(proxy, StreamProxy):
streamPaths.append(f"{proxyName}|{proxy.getStreamPath()}")

self.propertyOperator.createCustomReadOnlyProperty("streamPaths", streamPaths, addBasePrefix=True, addOperatorPrefix=True)

self.propertyOperator.createCustomReadOnlyProperty(
"streamPaths", streamPaths, addBasePrefix=True, addOperatorPrefix=True
)

def getProxy(self, proxyName : str) -> Optional[Proxy]:
def getProxy(self, proxyName: str) -> Optional[Proxy]:
return self.__proxies.get(proxyName)

def _cleanup(self):
def _cleanup(self) -> None:
# xclient shutdown occasionally failing?
# self.xclient.shutdown()
self.propertyOperator.deregisterAll()
Expand All @@ -96,7 +190,7 @@ def getTimer(self) -> Timer:
"""Use only when needed, and only when associated with agent"""
if self.timer is None:
raise ValueError("Timer not initialized")

return self.timer

def getTeam(self) -> Optional[TEAM]:
Expand All @@ -112,18 +206,19 @@ def getTeam(self) -> Optional[TEAM]:
return TEAM.BLUE
else:
return TEAM.RED

def _runOwnCreate(self):
""" The agent wants to do its own stuff too... okay."""
"""The agent wants to do its own stuff too... okay."""

logIp = f"http://{DEVICEIP}:5000/{self.agentName}/{LogStreamOperator.LOGPATH}"

self.propertyOperator.createCustomReadOnlyProperty("logIP", logIp, addBasePrefix=True, addOperatorPrefix=True)
self.propertyOperator.createCustomReadOnlyProperty(
"logIP", logIp, addBasePrefix=True, addOperatorPrefix=True
)

self.__ensureProxies()
self._updateNetworkProxyInfo()


@classmethod
def getName(cls):
return cls.__name__
Expand Down Expand Up @@ -176,28 +271,29 @@ def onClose(self) -> None:
pass

# ----- proxy methods -----
def __ensureProxies(self):
def __ensureProxies(self) -> None:
for proxyName, proxyType in self._getProxyRequests().items():
if proxyName not in self.__proxies or type(self.__proxies[proxyName]) is not proxyType:
if (
proxyName not in self.__proxies
or type(self.__proxies[proxyName]) is not proxyType
):
print(type(self.__proxies[proxyName]) is proxyType)
raise RuntimeError(f"Agent proxies are not correcty initialized!\n{self._getProxyRequests()=}\n{self.__proxies.items()=}")
raise RuntimeError(
f"Agent proxies are not correcty initialized!\n{self._getProxyRequests()=}\n{self.__proxies.items()=}"
)

@classmethod
def requestProxies(cls):
""" Override this, and add all of your proxy requests"""
"""Override this, and add all of your proxy requests"""
pass

@classmethod
def addProxyRequest(cls, proxyName : str, proxyType: ProxyType) -> None:
""" Method to request that a stream proxy will be given to this agent to display streams
NOTE: you must override requestProxies() and add your calls to this there, or else it will not be used!
def addProxyRequest(cls, proxyName: str, proxyType: ProxyType) -> None:
"""Method to request that a stream proxy will be given to this agent to display streams
NOTE: you must override requestProxies() and add your calls to this there, or else it will not be used!
"""
if not hasattr(cls, '_proxyRequests'):
cls._proxyRequests = {}

cls._proxyRequests[proxyName] = proxyType

@classmethod
def _getProxyRequests(cls) -> Dict[str, ProxyType]:
return getattr(cls, '_proxyRequests', {})

return getattr(cls, "_proxyRequests", {})
26 changes: 18 additions & 8 deletions Alt-Core/src/Alt/Core/Agents/AgentExample.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
from typing import Any
from .Agent import Agent
from __future__ import annotations

from typing import Any, Optional, TYPE_CHECKING

class AgentExample(Agent):
""" This example agent shows how simple it can be to create a task.

This agent creates a name property (which allows you to change its name), and then it tells it to you 50 times before ending.
from .Agent import AgentBase

if TYPE_CHECKING:
from Core.Operators.PropertyOperator import Property


class AgentExample(AgentBase):
"""This example agent shows how simple it can be to create a task.

This agent creates a name property (which allows you to change its name), and then it tells it to you 50 times before ending.
"""

def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.timesRun: int = 0
self.nameProp = None
self.nameProp: Optional[Property] = None

def create(self) -> None:
# for example here i can create a propery to configure what to call myself
Expand All @@ -28,6 +35,7 @@ def runPeriodic(self) -> None:
# for example, i can tell the world what im called

self.timesRun += 1
assert self.nameProp is not None
name = self.nameProp.get()
self.projectNameProp.set(name)
self.Sentinel.info(f"My name is {name}")
Expand All @@ -36,7 +44,9 @@ def onClose(self) -> None:
# task cleanup here
# for example, i can tell the world that my time has come
if self.nameProp is not None:
self.Sentinel.info(f"My time has come. Never forget the name {self.nameProp.get()}!")
self.Sentinel.info(
f"My time has come. Never forget the name {self.nameProp.get()}!"
)

def isRunning(self) -> bool:
# condition to keep task running here
Expand Down
42 changes: 23 additions & 19 deletions Alt-Core/src/Alt/Core/Agents/BindableAgent.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
from __future__ import annotations

from abc import abstractmethod
from functools import partial


class BindableAgent:
""" When an Agent requires extra keyword arguments, the Neo class cannot just create an instance of it due to it needing arguments.
To fix this, we use the functools partial method. You can imagine it pre-binding the args of the agent, before it gets created.
This way, when the agent needs to be created, the arguments will be stored inside, and Neo can create an instance.
"""When an Agent requires extra keyword arguments, the Neo class cannot just create an instance of it due to it needing arguments.
To fix this, we use the functools partial method. You can imagine it pre-binding the args of the agent, before it gets created.
This way, when the agent needs to be created, the arguments will be stored inside, and Neo can create an instance.
"""

@classmethod
@abstractmethod
def bind(cls, *args, **kwargs) -> partial:
""" To make it clearer what arguments an agent needs, please override this bind method and specify the same input arguments as the agents __init__
At the moment, this is the only way to change the static method signature of bind, so people know what arguments to provide.
In the method body, you can just call cls._getBindedAgent() with the input arguments.
"""To make it clearer what arguments an agent needs, please override this bind method and specify the same input arguments as the agents __init__
At the moment, this is the only way to change the static method signature of bind, so people know what arguments to provide.
In the method body, you can just call cls._getBindedAgent() with the input arguments.

Example:
``` python
class bindable(Agent, BindableAgent):
# overriding the bind method to make the static method signature clear
@classmethod
def bind(arg1 : str, arg2 : int, ....):
# you can use keyword or positional arguments, but it should match your constructor
return cls._getBindedAgent(arg1, arg2=arg2, ....)
Example:
``` python
class bindable(Agent, BindableAgent):
# overriding the bind method to make the static method signature clear
@classmethod
def bind(arg1 : str, arg2 : int, ....):
# you can use keyword or positional arguments, but it should match your constructor
return cls._getBindedAgent(arg1, arg2=arg2, ....)

def __init__(arg1 : str, arg2 : int, ....):
# same signature as above, ensures that when neo gets the bound agent, it needs no extra arguments
# init things....
```
def __init__(arg1 : str, arg2 : int, ....):
# same signature as above, ensures that when neo gets the bound agent, it needs no extra arguments
# init things....
```
"""
pass

@classmethod
def _getBindedAgent(cls, *args, **kwargs) -> partial:
return partial(cls, *args, **kwargs)
return partial(cls, *args, **kwargs)
6 changes: 4 additions & 2 deletions Alt-Core/src/Alt/Core/Agents/PositionLocalizingAgentBase.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from __future__ import annotations

from typing import Tuple, Any

from .Agent import Agent
from .Agent import AgentBase
from ..Utils import NtUtils


class PositionLocalizingAgentBase(Agent):
class PositionLocalizingAgentBase(AgentBase):
"""Agent -> PositionLocalizingAgentBase

Extending Agent with Localizing capabilites. Supports only XTables
Expand Down
Loading
Loading