diff --git a/.vscode/launch.json b/.vscode/launch.json index 6142583..3b2755d 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -5,29 +5,31 @@ "version": "0.2.0", "configurations": [ { - "name": "Debug Unit Test", + "name": "Python Debugger: Virtac", "type": "debugpy", "request": "launch", - "justMyCode": false, - "program": "${file}", - "purpose": [ - "debug-test" - ], + "program": "/venv/bin/virtac", "console": "integratedTerminal", + "args": [ + "-v" + ], "env": { - // Enable break on exception when debugging tests (see: tests/conftest.py) - "PYTEST_RAISE": "1", + "EPICS_CA_SERVER_PORT": "8064", + "EPICS_CA_REPEATER_PORT": "8065", }, }, { - "name": "Python Debugger: Virtac", + "name": "Python Debugger: ATIP", "type": "debugpy", "request": "launch", - "program": "/venv/bin/virtac", + "program": "/venv/bin/atip", "console": "integratedTerminal", + "args": [ + "-t" + ], "env": { - "EPICS_CA_SERVER_PORT": "7064", - "EPICS_CA_REPEATER_PORT": "7065", + "EPICS_CA_SERVER_PORT": "8064", + "EPICS_CA_REPEATER_PORT": "8065", }, }, ] diff --git a/INSTALL.rst b/INSTALL.rst index 5e18eac..98e0751 100644 --- a/INSTALL.rst +++ b/INSTALL.rst @@ -31,6 +31,3 @@ Initial Setup and Installation Troubleshooting --------------- - -Please note that for ATIP to function with Python 3.7 or later, you must -use Cothread>=2.16. diff --git a/docs/conf.py b/docs/conf.py index 63ca443..c5399dc 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -88,7 +88,6 @@ ("py:class", "obj"), ("py:class", "class"), ("py:class", "at.lattice.lattice_object.Lattice"), - ("py:class", "cothread.Event"), ] # Both the class’ and the __init__ method’s docstring are concatenated and diff --git a/pyproject.toml b/pyproject.toml index 0b61932..349b78a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,8 +17,8 @@ dependencies = [ "scipy", "pytac>=0.3.0", "accelerator-toolbox>=0.2.0", - "cothread", "softioc", + "aioca", ] dynamic = ["version"] diff --git a/src/atip/__main__.py b/src/atip/__main__.py index cfb9358..1bc089a 100644 --- a/src/atip/__main__.py +++ b/src/atip/__main__.py @@ -1,14 +1,21 @@ """Interface for ``python -m atip``.""" +import asyncio +import logging +import random from argparse import ArgumentParser from collections.abc import Sequence +import pytac + +import atip + from . import __version__ __all__ = ["main"] -def main(args: Sequence[str] | None = None) -> None: +async def async_main(args: Sequence[str] | None = None) -> None: """Argument parser for the CLI.""" parser = ArgumentParser() parser.add_argument( @@ -17,7 +24,39 @@ def main(args: Sequence[str] | None = None) -> None: action="version", version=__version__, ) - parser.parse_args(args) + parser.add_argument( + "-t", + "--run-test", + help="Start an endless test of atip", + action="store_true", + ) + args = parser.parse_args() + + if args.run_test: + # Load the DIAD lattice from Pytac. + lat = await pytac.load_csv.load("DIAD") + await atip.load_sim.load_from_filepath(lat, "../atip/src/atip/rings/DIAD.mat") + # Use the sim by default. + lat.set_default_data_source(pytac.SIM) + # The initial beam position is zero. + print(await lat.get_value("x")) + + # Get the first horizontal corrector magnet and set its current to 1A. + hcor1 = lat.get_elements("HSTR")[0] + while True: + kick: float = random.uniform(0, 2) + print(f"Applying x_kick of {kick}") + await hcor1.set_value("x_kick", kick, units=pytac.ENG) + # Now the x beam position has changed. + print(f"New data: {await lat.get_value('x')}") + await asyncio.sleep(1) + + +def main(args: Sequence[str] | None = None) -> None: + logging.basicConfig() + logging.getLogger().setLevel(logging.DEBUG) + # Load the AT sim into the Pytac lattice. + asyncio.run(async_main(args)) if __name__ == "__main__": diff --git a/src/atip/load_sim.py b/src/atip/load_sim.py index 076aaa0..d15a199 100644 --- a/src/atip/load_sim.py +++ b/src/atip/load_sim.py @@ -12,7 +12,7 @@ SIMULATED_FIELDS = {"a1", "b0", "b1", "b2", "x", "y", "f", "x_kick", "y_kick"} -def load_from_filepath( +async def load_from_filepath( pytac_lattice, at_lattice_filepath, callback=None, disable_emittance=False ): """Load simulator data sources onto the lattice and its elements. @@ -32,12 +32,12 @@ def load_from_filepath( at_lattice = at.load.load_mat( at_lattice_filepath, name=pytac_lattice.name, - energy=pytac_lattice.get_value("energy"), + energy=await pytac_lattice.get_value("energy"), ) - return load(pytac_lattice, at_lattice, callback, disable_emittance) + return await load(pytac_lattice, at_lattice, callback, disable_emittance) -def load(pytac_lattice, at_lattice, callback=None, disable_emittance=False): +async def load(pytac_lattice, at_lattice, callback=None, disable_emittance=False): """Load simulator data sources onto the lattice and its elements. Args: @@ -58,7 +58,9 @@ def load(pytac_lattice, at_lattice, callback=None, disable_emittance=False): f"(AT:{len(at_lattice)} Pytac:{len(pytac_lattice)})." ) # Initialise an instance of the ATSimulator Object. - atsim = ATSimulator(at_lattice, callback, disable_emittance) + atsim = await ATSimulator.create( + at_lattice, callback, disable_emittance + ) # TODO: This feels like a wierd place to create the simulator? # Set the simulator data source on the Pytac lattice. pytac_lattice.set_data_source(ATLatticeDataSource(atsim), pytac.SIM) # Load the sim onto each element. diff --git a/src/atip/sim_data_sources.py b/src/atip/sim_data_sources.py index cb88528..9e5e38f 100644 --- a/src/atip/sim_data_sources.py +++ b/src/atip/sim_data_sources.py @@ -123,7 +123,7 @@ def add_field(self, field): else: self._fields.append(field) - def get_value(self, field, handle=None, throw=True): + async def get_value(self, field, handle=None, throw=True): """Get the value for a field. Args: @@ -148,7 +148,7 @@ def get_value(self, field, handle=None, throw=True): # Wait for any outstanding calculations to conclude, to ensure they are # complete before a value is returned; if the wait times out then raise # an error message or log a warning according to the value of throw. - if not self._atsim.wait_for_calculations(): + if not await self._atsim.wait_for_calculations(): error_msg = "Check for completion of outstanding calculations timed out." if throw: raise ControlSystemException(error_msg) @@ -160,7 +160,7 @@ def get_value(self, field, handle=None, throw=True): else: raise FieldException(f"No field {field} on AT element {self._at_element}.") - def set_value(self, field, value, throw=None): + async def set_value(self, field, value, throw=None): """Set the value for a field. The field and value go onto the queue of changes on the ATSimulator to be passed to make_change when the queue is emptied. @@ -178,7 +178,7 @@ def set_value(self, field, value, throw=None): """ if field in self._fields: if field in self._set_field_funcs.keys(): - self._atsim.queue_set(self._make_change, field, value) + await self._atsim.queue_set(self._make_change, field, value) else: raise HandleException( f"Field {field} cannot be set on element data source {self}." @@ -413,7 +413,7 @@ def get_fields(self): """ return list(self._field_funcs.keys()) - def get_value(self, field, handle=None, throw=True): + async def get_value(self, field, handle=None, throw=True): """Get the value for a field on the Pytac lattice. Args: @@ -438,7 +438,7 @@ def get_value(self, field, handle=None, throw=True): # Wait for any outstanding calculations to conclude, to ensure they are # complete before a value is returned; if the wait times out then raise # an error message or log a warning according to the value of throw. - if not self._atsim.wait_for_calculations(): + if not await self._atsim.wait_for_calculations(): error_msg = "Check for completion of outstanding calculations timed out." if throw: raise ControlSystemException(error_msg) diff --git a/src/atip/simulator.py b/src/atip/simulator.py index f51f82b..77e8233 100644 --- a/src/atip/simulator.py +++ b/src/atip/simulator.py @@ -1,11 +1,12 @@ """Module containing an interface with the AT simulator.""" +import asyncio +import concurrent import logging from dataclasses import dataclass from warnings import warn import at -import cothread import numpy from numpy.typing import ArrayLike from pytac.exceptions import DataSourceException, FieldException @@ -73,7 +74,7 @@ class ATSimulator: **Attributes** Attributes: - up_to_date (cothread.Event): A flag that indicates if the physics data + up_to_date (asyncio.Event): A flag that indicates if the physics data is up to date with all the changes made to the AT lattice. @@ -87,18 +88,31 @@ class ATSimulator: envelope based emittance calculations. _lattice_data (LatticeData): calculated physics data function linopt (see at.lattice.linear.py). - _queue (cothread.EventQueue): A queue of changes to be applied to + _queue (asyncio.Queue): A queue of changes to be applied to the centralised lattice on the next recalculation cycle. - _paused (cothread.Event): A flag used to temporarily pause the + _paused (asyncio.Event): A flag used to temporarily pause the physics calculations. - _calculation_thread (cothread.Thread): A thread to check the queue - for new changes to the AT - lattice and recalculate the - physics data upon a change. + _calculation_task (asyncio.Task): A task to check the queue + for new changes to the AT + lattice and recalculate the + physics data upon a change. + _new_data_lock (asyncio.Lock): A lock which can be taken + to stop new caput callbacks + being added to the queue while + held. """ - def __init__(self, at_lattice, callback=None, disable_emittance=False): + _loop: asyncio.BaseEventLoop + _queue: asyncio.Queue + _paused: asyncio.Event + _quit_thread: asyncio.Event + _up_to_date: asyncio.Event + _calculation_task: asyncio.Task + _new_data_lock: asyncio.Lock + + @classmethod + async def create(cls, at_lattice, callback=None, disable_emittance=False): """ .. Note:: To avoid errors, the physics data must be initially calculated here, during creation, otherwise it could be accidentally @@ -116,6 +130,7 @@ def __init__(self, at_lattice, callback=None, disable_emittance=False): **Methods:** """ + self = cls() if (not callable(callback)) and (callback is not None): raise TypeError( f"If passed, 'callback' should be callable, {callback} is not." @@ -130,17 +145,20 @@ def __init__(self, at_lattice, callback=None, disable_emittance=False): self._at_lat, self._rp, self._disable_emittance ) - # Threading stuff initialisation. - self._queue = cothread.EventQueue() - # Explicitly manage the cothread Events, so turn off auto_reset. - # These are False when reset, True when signalled. - self._paused = cothread.Event(auto_reset=False) - self._quit_thread = cothread.Event(auto_reset=False) - self.up_to_date = cothread.Event(auto_reset=False) - self.up_to_date.Signal() - self._calculation_thread = cothread.Spawn(self._recalculate_phys_data, callback) - - def queue_set(self, func, field, value): + self._loop = asyncio.get_event_loop() # TODO: check a loop is running + self._queue = asyncio.Queue() + self._paused = asyncio.Event() + self._quit_thread = asyncio.Event() + self._up_to_date = asyncio.Event() + self._up_to_date.set() + self._new_data_lock = asyncio.Lock() + + self._calculation_task = asyncio.create_task( + self._recalculate_phys_data(callback) + ) # This task should last the lifetime of the program + return self + + async def queue_set(self, func, field, value): """Add a change to the queue, to be applied when the queue is emptied. Args: @@ -148,31 +166,50 @@ def queue_set(self, func, field, value): field (str): The field to be changed. value (float): The value to be set. """ - cothread.CallbackResult(self.up_to_date.Reset) - cothread.Callback(self._queue.Signal, (func, field, value)) - - def _gather_one_sample(self): + async with self._new_data_lock: + # Clear first otherwise it is possible to yield to another thread which will + # then think the lattice is up to date with the most recently accepted caput + # /lattice.set_value. when it isnt. + self._up_to_date.clear() + await self._queue.put((func, field, value)) + # If this flag gets cleared while we are recalculating, then it can cause + # everything to lock, so we setup a lock between this function and the + # recalculate function + logging.debug(f"Added task to async queue. qsize={self._queue.qsize()}") + + async def _gather_one_sample(self): """If the queue is empty Wait() yields until an item is added. When the queue is not empty the oldest change will be removed and applied to the AT lattice. """ - apply_change_method, field, value = self._queue.Wait() + logging.debug("Waiting for new item in queue") + apply_change_method, field, value = await self._queue.get() apply_change_method(field, value) + logging.debug("Processed item from queue") - def quit_calculation_thread(self, timeout=10): + async def cancel_calculation_task(self, timeout=10): """Quit the calculation thread after the current loop is complete.""" - cothread.CallbackResult(self._quit_thread.Signal) - self.trigger_calculation() - cothread.CallbackResult(self._calculation_thread.Wait, timeout) - # For some reason we have to wait a bit before we can clear the queue. - cothread.Sleep(0.1) - cothread.CallbackResult(self._queue.Reset) - - def _recalculate_phys_data(self, callback): - """Target function for the Cothread thread. Recalculates the physics + # TODO: Im not really sure what the purpose of this function is, as it + # kills all functionality in the virtac and there is no way to restart + # it? It also isnt called anywhere, although could be from the python shell + + # Do one last calculation and then wait 0.5 seconds to give pvs a chance + # to be updated + await self.trigger_calculation() + await self._quit_thread.set() + await asyncio.sleep(0.5) + tasks = asyncio.all_tasks() + for task in tasks: + await task.cancel() + + async def _recalculate_phys_data(self, callback): + """Run as a never ending asyncio task. Recalculates the physics data dependent on the status of the '_paused' flag and the length of the queue. The calculations only take place if '_paused' is False and - there are one or more changes on the queue. + there are one or more changes on the queue. After doing the recalculations, + we set _up_to_date flag to signal this and run any passed callback functions. + For VIRTAC, the passed callback is ATIPServer.update_pvs which updates all + softioc PVs with the fresh data. .. Note:: If an error or exception is raised in the running thread then it does not continue running so subsequent calculations are not @@ -187,33 +224,46 @@ def _recalculate_phys_data(self, callback): at.AtWarning: any error or exception that was raised in the thread, but as a warning. """ - # Using Cothread Event is only ~4% slower than a normal Boolean but much safer. - while not self._quit_thread: - logging.debug("Starting recalculation loop") - self._gather_one_sample() - while self._queue: - self._gather_one_sample() - if bool(self._paused) is False: - try: - self._lattice_data = calculate_optics( - self._at_lat, self._rp, self._disable_emittance - ) - except Exception as e: - warn(at.AtWarning(e), stacklevel=1) - # Signal the up to date flag since the physics data is now up to date. - # We do this before the callback is executed in case the callback - # checks the flag. - self.up_to_date.Signal() - if callback is not None: - logging.debug("Executing callback function.") - callback() - logging.debug("Callback completed.") - # This could cause thread-locking issues if another application using - # cothreads is poorly written and running on the same machine, but this - # isn't the case with any of the software that currently runs against - # ATIP/virtac and it's required for cothread.CallbackResult to play nicely - # with Bluesky. So it's a calculated risk until we move away from Cothread. - cothread.Yield() + logging.debug("Starting recalculation loop") + while not self._quit_thread.is_set(): + await self._gather_one_sample() + while not self._queue.empty(): + await self._gather_one_sample() + logging.debug("Recaulculating simulation with new setpoints.") + if not self._paused.is_set(): + async with self._new_data_lock: + with concurrent.futures.ProcessPoolExecutor() as pool: + try: + self._lattice_data = await self._loop.run_in_executor( + pool, + calculate_optics, + self._at_lat, + self._rp, + self._disable_emittance, + ) + except Exception as e: + # If an error is found while doing the calculations we dont + # update lattice data. TODO: We currently update the pvs + # anyway but this wont do anything, so could be improved + warn(at.AtWarning(e), stacklevel=1) + + # Signal the up to date flag since the physics data is now up to + # date. We do this before the callback is executed in case the + # callback checks the flag. + self._up_to_date.set() + logging.debug("Simulation up to date.") + if callback is not None: + logging.debug( + f"Executing callback function: {callback.__name__}" + ) + # For Virtac this function calls update_pvs() which gets data + # from the pytac datasource to update the softioc pvs with. The + # data source is sim_data_sources.py and its get_value() + # function waits on the wait_for_calculation() function which + # waits for the up_to_date flag which currently will always be + # set, so this process is pointless. + await callback() + logging.debug("Callback completed.") def toggle_calculations(self): """Pause or unpause the physics calculations by setting or clearing the @@ -221,37 +271,38 @@ def toggle_calculations(self): .. Note:: This does not pause the emptying of the queue. """ - if self._paused: - cothread.CallbackResult(self._paused.Reset) + if self._paused.is_set(): + self._paused.clear() else: - cothread.CallbackResult(self._paused.Signal) + self._paused.set() def pause_calculations(self): """Pause the physics calculations by setting the _paused flag. .. Note:: This does not pause the emptying of the queue. """ - if not self._paused: - cothread.CallbackResult(self._paused.Signal) + # TODO: These dont currently get called anyway, maybe add a pv to call them? + if not self._paused.is_set(): + self._paused.set() - def unpause_calculations(self): + async def unpause_calculations(self): """Unpause the physics calculations by clearing the _paused flag.""" - if self._paused: - cothread.CallbackResult(self._paused.Reset) - if not self.up_to_date: - self.trigger_calculation() + if self._paused.is_set(): + await self._paused.clear() + if not self._up_to_date: + await self.trigger_calculation() - def trigger_calculation(self): + async def trigger_calculation(self): """Unpause the physics calculations and add a null item to the queue to trigger a recalculation. .. Note:: This method does not wait for the recalculation to complete, that is up to the user. """ - self.unpause_calculations() + await self.unpause_calculations() self.queue_set(lambda *x: None, None, None) - def wait_for_calculations(self, timeout=10): + async def wait_for_calculations(self, timeout=10): """Wait until the physics calculations have taken account of all changes to the AT lattice, i.e. the physics data is fully up to date. @@ -263,9 +314,9 @@ def wait_for_calculations(self, timeout=10): concluded, else True. """ try: - cothread.CallbackResult(self.up_to_date.Wait, timeout) + await asyncio.wait_for(self._up_to_date.wait(), timeout) return True - except cothread.Timedout: + except asyncio.exceptions.TimeoutError: return False # Get lattice related data: diff --git a/src/atip/utils.py b/src/atip/utils.py index eb57037..7953b52 100644 --- a/src/atip/utils.py +++ b/src/atip/utils.py @@ -29,7 +29,7 @@ def load_at_lattice(mode="I04", **kwargs): return at_lattice -def loader(mode="I04", callback=None, disable_emittance=False): +async def loader(mode="I04", callback=None, disable_emittance=False): """Load a unified lattice of the specifed mode. .. Note:: A unified lattice is a Pytac lattice where the corresponding AT @@ -46,13 +46,15 @@ def loader(mode="I04", callback=None, disable_emittance=False): pytac.lattice.Lattice: A Pytac lattice object with the simulator data source loaded. """ - pytac_lattice = pytac.load_csv.load(mode, symmetry=24) + pytac_lattice = await pytac.load_csv.load(mode, symmetry=24) at_lattice = load_at_lattice( mode, periodicity=1, - energy=pytac_lattice.get_value("energy"), + energy=await pytac_lattice.get_value("energy"), + ) + lattice = await atip.load_sim.load( + pytac_lattice, at_lattice, callback, disable_emittance ) - lattice = atip.load_sim.load(pytac_lattice, at_lattice, callback, disable_emittance) return lattice diff --git a/src/virtac/atip_ioc_entry.py b/src/virtac/atip_ioc_entry.py index cf1f89e..08f70dd 100644 --- a/src/virtac/atip_ioc_entry.py +++ b/src/virtac/atip_ioc_entry.py @@ -1,13 +1,13 @@ import argparse +import asyncio import logging import os import socket from pathlib import Path from warnings import warn -import epicscorelibs.path.cothread # noqa -from cothread.catools import ca_nothing, caget -from softioc import builder, softioc +from aioca import CANothing, caget +from softioc import asyncio_dispatcher, builder, softioc from . import atip_server @@ -47,11 +47,12 @@ def parse_arguments(): return parser.parse_args() -def main(): +async def async_main(): """Main entrypoint for virtac. Executed when running the 'virtac' command""" args = parse_arguments() + loop = asyncio.get_event_loop() # TODO: check a loop is running log_level = logging.DEBUG if args.verbose else logging.INFO - logging.basicConfig(level=log_level, format=LOG_FORMAT) + logging.getLogger().setLevel(log_level) # Determine the ring mode if args.ring_mode is not None: @@ -61,19 +62,19 @@ def main(): ring_mode = str(os.environ["RINGMODE"]) except KeyError: try: - value = caget("SR-CS-RING-01:MODE", timeout=0.5, format=2) + value = await caget("SR-CS-RING-01:MODE", timeout=0.5, format=2) ring_mode = value.enums[int(value)] logging.warning( f"Ring mode not specified, using value from real " f"machine as default: {value}" ) - except ca_nothing: + except CANothing: ring_mode = "I04" logging.warning(f"Ring mode not specified, using default: {ring_mode}") # Create PVs. logging.debug("Creating ATIP server") - server = atip_server.ATIPServer( + server: atip_server.ATIPServer = await atip_server.ATIPServer.create( ring_mode, DATADIR / ring_mode / "limits.csv", DATADIR / ring_mode / "bba.csv", @@ -117,10 +118,28 @@ def main(): os.environ["EPICS_CAS_AUTO_BEACON_ADDR_LIST"] = "NO" # Start the IOC. + dispatcher = asyncio_dispatcher.AsyncioDispatcher(loop=loop) builder.LoadDatabase() - softioc.iocInit() + softioc.iocInit(dispatcher) server.monitor_mirrored_pvs() if args.enable_tfb: server.setup_tune_feedback() - context = globals() | {"server": server} - softioc.interactive_ioc(context) + + # context = globals() | {"server": server} + # softioc.interactive_ioc(context) + # Leaving the IOC in interactive mode can interfere with logging of + # error messages, try commenting out the above line and uncommenting + # the below to run without the interactive ioc shell. + # TODO: Make interactive ioc work with logging + while True: + logging.debug("Sleeping") + await asyncio.sleep(10) + + +def main(): + logging.basicConfig( + format="%(asctime)s.%(msecs)03d %(levelname)-4s %(message)s", + datefmt="%H:%M:%S", + ) + # Load the AT sim into the Pytac lattice. + asyncio.run(async_main()) diff --git a/src/virtac/atip_server.py b/src/virtac/atip_server.py index 5bb2758..a07f33d 100644 --- a/src/virtac/atip_server.py +++ b/src/virtac/atip_server.py @@ -5,7 +5,7 @@ import numpy import pytac -from cothread.catools import camonitor +from aioca import camonitor from pytac.device import SimpleDevice from pytac.exceptions import FieldException, HandleException from softioc import builder @@ -59,8 +59,9 @@ class ATIPServer: virtual accelerator and the pv object itself. """ - def __init__( - self, + @classmethod + async def create( + cls, ring_mode, limits_csv=None, bba_csv=None, @@ -89,7 +90,10 @@ def __init__( information see create_csv.py. disable_emittance (bool): Whether the emittance should be disabled. """ - self.lattice = atip.utils.loader(ring_mode, self.update_pvs, disable_emittance) + self = cls() + self.lattice = await atip.utils.loader( + ring_mode, self.update_pvs, disable_emittance + ) self.tune_feedback_status = False self._pv_monitoring = False self._tune_fb_csv_path = tune_csv @@ -103,7 +107,7 @@ def __init__( self._offset_pvs = {} self._record_names = {} print("Starting record creation.") - self._create_records(limits_csv, disable_emittance) + await self._create_records(limits_csv, disable_emittance) if bba_csv is not None: self._create_bba_records(bba_csv) if feedback_csv is not None: @@ -111,12 +115,13 @@ def __init__( if mirror_csv is not None: self._create_mirror_records(mirror_csv) print(f"Finished creating all {len(self._record_names)} records.") + return self def _update_record_names(self, records): """Updates _record_names using the supplied list of softioc record objects.""" self._record_names |= {record.name: record for record in list(records)} - def update_pvs(self): + async def update_pvs(self): """The callback function passed to ATSimulator during lattice creation, it is called each time a calculation of physics data is completed. It updates all the in records that do not have a corresponding out record @@ -129,17 +134,17 @@ def update_pvs(self): # index 0 is the lattice itself rather than an element for index in indexes: if index == 0: - value = self.lattice.get_value( + value = await self.lattice.get_value( field, units=pytac.ENG, data_source=pytac.SIM ) rb_record.set(value) else: - value = self.lattice[index - 1].get_value( + value = await self.lattice[index - 1].get_value( field, units=pytac.ENG, data_source=pytac.SIM ) rb_record.set(value) - def _create_records(self, limits_csv, disable_emittance): + async def _create_records(self, limits_csv, disable_emittance): """Create all the standard records from both lattice and element Pytac fields. Several assumptions have been made for simplicity and efficiency, these are: @@ -177,7 +182,7 @@ def _create_records(self, limits_csv, disable_emittance): self._in_records[bend_in_record][0].append(element.index) else: for field in element.get_fields()[pytac.SIM]: - value = element.get_value( + value = await element.get_value( field, units=pytac.ENG, data_source=pytac.SIM ) get_pv = element.get_pv_name(field, pytac.RB) @@ -228,7 +233,7 @@ def _create_records(self, limits_csv, disable_emittance): # Ignore basic devices as they do not have PVs. if not isinstance(self.lattice.get_device(field), SimpleDevice): get_pv = self.lattice.get_pv_name(field, pytac.RB) - value = self.lattice.get_value( + value = await self.lattice.get_value( field, units=pytac.ENG, data_source=pytac.SIM ) builder.SetDeviceName(get_pv.split(":", 1)[0]) @@ -242,7 +247,7 @@ def _create_records(self, limits_csv, disable_emittance): ) print("~*~*Woah, we're halfway there, Wo-oah...*~*~") - def _on_update(self, value, name): + async def _on_update(self, value, name): """The callback function passed to out records, it is called after successful record processing has been completed. It updates the out record's corresponding in record with the value that has been set and @@ -266,7 +271,7 @@ def _on_update(self, value, name): pass for i in index: - self.lattice[i - 1].set_value( + await self.lattice[i - 1].set_value( field, value, units=pytac.ENG, data_source=pytac.SIM ) @@ -465,7 +470,7 @@ def monitor_mirrored_pvs(self): for pv, output in self._mirrored_records.items(): mask = callback_set(output) try: - self._monitored_pvs[pv] = camonitor(pv, mask.callback) + self._monitored_pvs[pv] = camonitor(pv, callback=mask.callback) except Exception as e: warn(e, stacklevel=1) @@ -516,7 +521,7 @@ def setup_tune_feedback(self, tune_csv=None): try: self._monitored_pvs[line["delta"]] = camonitor( line["delta"], mask.callback - ) + ) # TODO: Could be a lambda instead except Exception as e: warn(e, stacklevel=1) diff --git a/src/virtac/create_csv.py b/src/virtac/create_csv.py index 18c4c7a..6b6a9f4 100644 --- a/src/virtac/create_csv.py +++ b/src/virtac/create_csv.py @@ -3,12 +3,13 @@ """ import argparse +import asyncio import csv import os import numpy import pytac -from cothread.catools import FORMAT_CTRL, caget +from aioca import FORMAT_CTRL, caget import atip @@ -149,7 +150,7 @@ def generate_bba_pvs(all_elements, symmetry): return data -def generate_pv_limits(lattice): +async def generate_pv_limits(lattice): """Get the control limits and precision values from the live machine for all normal PVS. @@ -161,7 +162,7 @@ def generate_pv_limits(lattice): for element in lattice: for field in element.get_fields()[pytac.SIM]: pv = element.get_pv_name(field, pytac.RB) - ctrl = caget(pv, format=FORMAT_CTRL) + ctrl = await caget(pv, format=FORMAT_CTRL) data.append( ( pv, @@ -177,7 +178,7 @@ def generate_pv_limits(lattice): except pytac.exceptions.HandleException: pass else: - ctrl = caget(pv, format=FORMAT_CTRL) + ctrl = await caget(pv, format=FORMAT_CTRL) data.append( ( pv, @@ -191,7 +192,7 @@ def generate_pv_limits(lattice): return data -def generate_mirrored_pvs(lattice): +async def generate_mirrored_pvs(lattice): """Structure of data: output type: @@ -230,8 +231,8 @@ def generate_mirrored_pvs(lattice): ] # Tune PV aliases. tune = [ - lattice.get_value("tune_x", pytac.RB, data_source=pytac.SIM), - lattice.get_value("tune_y", pytac.RB, data_source=pytac.SIM), + await lattice.get_value("tune_x", pytac.RB, data_source=pytac.SIM), + await lattice.get_value("tune_y", pytac.RB, data_source=pytac.SIM), ] data.append( ( @@ -253,8 +254,8 @@ def generate_mirrored_pvs(lattice): ) # Combined emittance and average emittance PVs. emit = [ - lattice.get_value("emittance_x", pytac.RB, data_source=pytac.SIM), - lattice.get_value("emittance_y", pytac.RB, data_source=pytac.SIM), + await lattice.get_value("emittance_x", pytac.RB, data_source=pytac.SIM), + await lattice.get_value("emittance_y", pytac.RB, data_source=pytac.SIM), ] data.append( ("aIn", "basic", "SR-DI-EMIT-01:HEMIT", "SR-DI-EMIT-01:HEMIT_MEAN", emit[0]) @@ -409,9 +410,9 @@ def parse_arguments(): return parser.parse_args() -if __name__ == "__main__": +async def main(): args = parse_arguments() - lattice = atip.utils.loader(args.ring_mode) + lattice = await atip.utils.loader(args.ring_mode) all_elements = atip.utils.preload(lattice) print("Creating feedback PVs CSV file.") data = generate_feedback_pvs(all_elements, lattice) @@ -420,11 +421,15 @@ def parse_arguments(): data = generate_bba_pvs(all_elements, lattice.symmetry) write_data_to_file(data, args.bba, args.ring_mode) print("Creating limits PVs CSV file.") - data = generate_pv_limits(lattice) + data = await generate_pv_limits(lattice) write_data_to_file(data, args.limits, args.ring_mode) print("Creating mirrored PVs CSV file.") - data = generate_mirrored_pvs(lattice) + data = await generate_mirrored_pvs(lattice) write_data_to_file(data, args.mirrored, args.ring_mode) print("Creating tune PVs CSV file.") data = generate_tune_pvs(lattice) write_data_to_file(data, args.tune, args.ring_mode) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/virtac/masks.py b/src/virtac/masks.py index 7b51155..b7fb756 100644 --- a/src/virtac/masks.py +++ b/src/virtac/masks.py @@ -1,4 +1,6 @@ -from cothread.catools import caget, caput +import inspect + +from aioca import caget, caput class callback_offset: @@ -41,7 +43,7 @@ def __init__(self, output): """ self.output = output - def callback(self, value, index=None): + async def callback(self, value, index=None): """When called set the passed value to all held output records. Args: @@ -49,7 +51,10 @@ def callback(self, value, index=None): index (int): Ignored, only there to support camonitor multiple. """ for record in self.output: - record.set(value) + if inspect.iscoroutinefunction(record.set): + await record.set(value) + else: + record.set(value) class caget_mask: @@ -63,8 +68,8 @@ def __init__(self, pv): self.pv = pv self.name = pv - def get(self): - return caget(self.pv) + async def get(self): + return await caget(self.pv) class caput_mask: @@ -78,9 +83,9 @@ def __init__(self, pv): self.pv = pv self.name = pv - def set(self, value): + async def set(self, value): """ Args: value (number): The value to caput to the PV. """ - return caput(self.pv, value) + return await caput(self.pv, value)