-
Notifications
You must be signed in to change notification settings - Fork 2
acsys.dpm_tutorial
The DPM API follows closely the underlying protocol to the Data Pool Manager; create a DPM context, add a set of requests using the DRF format, and start acquisition. In this tutorial, we'll start simple and build up to more advanced examples.
This example doesn't actually interact with a DPM, it queries the environment to see which DPMs are available. Although this is a useful, diagnostic tool, normal Python scripts will simply use the service discovery built into the API to attach to a DPM.
#!/usr/bin/env python3
import logging
import acsys.dpm
FORMAT = '%(asctime)-15s [%(levelname)s] %(message)s'
logging.basicConfig(format=FORMAT)
log = logging.getLogger('acsys')
log.setLevel(logging.INFO)
async def my_app(con):
dpms = await acsys.dpm.available_dpms(con)
log.info('available DPMs: %s', str(dpms))
acsys.run_client(my_app)acsys.dpm.available_dpms() multicasts a service discovery request and keeps track of who responds. The list of repliers is returned.
In this example we'll read the outside temperature (M:OUTTMP) at 1 hz.
The first thing we need to do is set up a Context Manager which handles the connection to DPM. Python Context Managers are used in with-statements and allow set-up and clean-up guarantees. In this package, DPMContext performs this task and, when used in an async-with-statement, returns a DPM object:
async with acsys.dpm.DPMContext(con) as dpm:
# The 'dpm' object is valid in this block and will get properly
# cleaned up no matter how this block is exited.The dpm object is ready to return data. However, at the moment, it won't return anything since no acquisition requests have been specified so let's specify one:
await dpm.add_entry(0, 'M:OUTTMP@p,1000')The .add_entry() sends a request to DPM to add the DRF specification and to associate it with the tag 0. Replies delivered by the DPM iterator will have the readings, timestamps, and tags. The script can use the tag to identify the source of the reading. In this case, we're only reading one device so the tag isn't very useful.
Next we tell DPM to start data acquisition:
await dpm.start()When .start() completes, the dpm object will generate replies. If all requests were properly formed and included valid device names, all replies will be instances of acsys.dpm.ItemData. If one of the entries was bad, its reply will be an instance of acsys.dpm.ItemStatus containing its tag and an ACNET status indicating the failure. The rest of the valid requests will continue to get ItemData replies.
ItemData objects have several properties: .tag contains the tag value given in the corresponding .add_entry() call; .stamp contains a datetime object holding the timestamp of the data; and .data holds the data of the reading.
After setting up the data requests and starting the acquisition, scripts will enter an async-for-loopto read the data:
async for ii in dpm.replies():
# 'ii' will be an instance of ItemData or ItemStatusFor this example, let's just print the item returned from the generator (as of this writing, DPM is incorrectly sending out a keep-alive message that should only be sent if the link is quiet for 5 seconds.) The whole example is, then:
#!/usr/bin/env python3
import logging
import acsys.dpm
FORMAT = '%(asctime)-15s [%(levelname)s] %(message)s'
logging.basicConfig(format=FORMAT)
log = logging.getLogger('acsys')
log.setLevel(logging.INFO)
async def my_app(con):
# Setup context
async with acsys.dpm.DPMContext(con) as dpm:
# Add acquisition requests
await dpm.add_entry(0, 'M:OUTTMP@p,1000')
# Start acquisition
await dpm.start()
# Process incoming data
async for ii in dpm.replies():
log.info(str(ii))
acsys.run_client(my_app)Since Python is a dynamically-typed language, the .data field will be of the type specified in the request. For instance, change the DRF string in Example 2 to 'M:OUTTMP.RAW@p,1000' and see that the data returned is a binary.
The DPMContext object sets up an environment for getting device readings: it finds an available DPM, connects to it, and returns the DPM object. An optional parameter to its constructor allows you to pick a particular DPM rather than get a random one from the pool. This should only be used for testing new features that are on a test DPM because, if you specify a particular DPM and it gets taken down, your script will return continuous errors until the DPM is restarted. When using one from the pool and when your DPM goes down, another one will get chosen.
In those rare cases that you need a particular DPM, you can do this:
# Force DPM01 to be our DPM.
async with acsys.dpm.DPMContext(con, dpm_node='DPM01') as dpm:
# The 'dpm' object is valid in this block and will always be connected
# to DPM01.For readings, a set of tag/drf strings are registered with DPM and acquisition is started. Then the script enters a for-loop which generates reading and status replies. The script can look at the tag field in these replies to associate the reading or status with the correct device. We tried to make settings support fit into this API so scripts can get readings and do settings in the same loop.
For settings, you add a tag/drf entry (using .add_entry()) for each device you wish to set. Then you use .apply_settings() to send values for DPM to set. The status of the setting will get reported in an ItemStatus` record. Note that this example is using made-up device names so it won't actually work! If you change them to cache devices, you could simulate a working system.
Let's try an example. Let's assume we have a temperature device, Z:RMTEMP, that returns the room's temperature in degrees Fahrenheit. Let's also assume a device that controls a fan, Z:RMFAN, which can be set between 0% and 100%. We're going to read the temperature every 10 seconds and set the fan accordingly; if it's colder than 70 degrees, the fan is off. If it's hotter than 90 degrees, it's on 100%. We'll linearly scale it between those extremes. The program would look like this:
#!/usr/bin/env python3
import logging
import acsys.dpm
from acsys.dpm import (ItemData, ItemStatus)
FORMAT = '%(asctime)-15s [%(levelname)s] %(message)s'
logging.basicConfig(format=FORMAT)
log = logging.getLogger('acsys')
log.setLevel(logging.INFO)
async def my_app(con):
async with acsys.dpm.DPMContext(con) as dpm:
await dpm.enable_settings(role='testing')
# Set-up our two devices. The second device uses the "N" event,
# which never occurs, so it won't generate replies.
await dpm.add_entry(0, 'Z:RMTEMP@p,10s')
await dpm.add_entry(1, 'Z:RMFAN.SETTING@N')
# Start up data acquisition.
await dpm.start()
# Loop through the stream of replies.
async for ii in dpm.replies():
# Look for readings from the room temperature.
if ii.is_reading_for(0):
fan_duty = min(max(ii.data - 70, 0), 20) * 5.0
await dpm.apply_settings([(1, fan_duty)])
# Look for results from setting the fan. If fatal,
# log a warning.
elif ii.is_status_for(1):
if ii.status.isFatal:
log.warning('cannot set fan speed: %s', ii.status)
acsys.run_client(my_app)Note that the DRF string for the fan device is asking for the setting property. Note also that, since the script doesn't care about the reading of the fan's setting, we specified the N (i.e. "never") event to it. If DPM sees a request using the "never" event, it doesn't include the device in requests sent to front-ends.