-
Notifications
You must be signed in to change notification settings - Fork 0
/
master_ioc.py
111 lines (87 loc) · 3.86 KB
/
master_ioc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# J. Maxwell 2023
from softioc import softioc, builder, asyncio_dispatcher
import asyncio
import yaml
import argparse
import importlib
import os
import datetime
async def main():
"""
Run an IOC: load settings, create dispatcher, set name, do boilerplate, loop on device coroutine and start interactive interface
Device to be set up comes from command line argument choosing option from settings file
"""
ioc, settings, records = load_settings()
os.environ['EPICS_CA_ADDR_LIST'] = settings['general']['epics_addr_list']
#os.environ['EPICS_CAS_BEACON_ADDR_LIST'] = settings['general']['epics_beacon_addr_list']
os.environ['EPICS_CA_AUTO_ADDR_LIST'] = 'NO'
#os.environ['EPICS_CAS_AUTO_BEACON_ADDR_LIST'] = 'NO'
dispatcher = asyncio_dispatcher.AsyncioDispatcher()
device_name = settings['general']['prefix']
builder.SetDeviceName(device_name)
d = DeviceIOC(device_name, ioc, settings, records)
builder.LoadDatabase()
softioc.iocInit(dispatcher)
async def loop():
while True:
await d.loop()
dispatcher(loop) # put functions to loop in here
softioc.interactive_ioc(globals())
class DeviceIOC():
"""Set up PVs for a given device IOC, run thread to interact with device
"""
def __init__(self, device_name, ioc, settings, records):
'''
Arguments:
device_name: name of device for PV prefix
settings: dict of device settings
records: dict of record settings
'''
self.module = importlib.import_module(settings[ioc]['module'])
self.records = records
self.delay = settings[ioc]['delay']
self.now = datetime.datetime.now()
self.device = self.module.Device(device_name, settings[ioc])
self.device.connect()
self.pv_time = builder.aIn(f"MAN:{ioc}_time")
self.pv_time.set(datetime.datetime.now().timestamp())
for name, entry in self.device.pvs.items(): # set the attributes of the PV (optional)
if name in self.records:
for field, value in self.records[name].items():
setattr(self.device.pvs[name], field, value)
async def loop(self):
"""Read indicator PVS from controller channels. Delay time between measurements is in seconds.
If read is successful, set timestamp PV for IOC.
"""
await asyncio.sleep(self.delay)
if await self.device.do_reads(): # get new readings from device and set into PVs
self.pv_time.set(datetime.datetime.now().timestamp()) # set time of last successful update
def load_settings():
"""Load device settings and records from YAML settings files.
Argument parser allows '-s' to give a different folder, '-i' tells which IOC to run"""
parser = argparse.ArgumentParser()
parser.add_argument("-s", help="Settings file folder, default is here.")
parser.add_argument("-i", help="Name of IOC to start")
args = parser.parse_args()
folder = args.s if args.s else '.'
with open(f'{folder}/settings.yaml') as f: # Load settings from YAML files
settings = yaml.load(f, Loader=yaml.FullLoader)
print(f"Loaded device settings from {folder}/settings.yaml.")
with open(f'{folder}/records.yaml') as f: # Load settings from YAML files
records = yaml.load(f, Loader=yaml.FullLoader)
print(f"Loaded records from {folder}/records.yaml.")
ioc_list = list(settings.keys())
ioc_list.remove('general')
if args.i:
ioc = args.i
else:
print("Select IOC to run from these entries in settings file using -i flag:")
[print(f" {x}") for x in ioc_list]
exit()
if ioc not in ioc_list:
print("Given IOC not in settings file. Select from these:")
[print(f" {x}") for x in ioc_list]
exit()
return ioc, settings, records
if __name__ == "__main__":
asyncio.run(main())