From 7bdb70a62c9b490af1c5de1d8b511fe54f69bf32 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Fri, 11 Nov 2022 21:17:36 -0600 Subject: [PATCH 01/31] add missing return when push succeeds --- src/pvaccess/SynchronizedQueue.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pvaccess/SynchronizedQueue.h b/src/pvaccess/SynchronizedQueue.h index 14d54e9..b6c5d24 100644 --- a/src/pvaccess/SynchronizedQueue.h +++ b/src/pvaccess/SynchronizedQueue.h @@ -287,6 +287,7 @@ void SynchronizedQueue::push(const T& t, double timeout) int size = std::queue::size(); if (maxLength <= 0 || size < maxLength) { pushUnsynchronized(t); + return; } // Clear pop event. itemPoppedEvent.tryWait(); From f8b0bf3058a0ec7ffbcdc95c1499d64c68425190 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Fri, 11 Nov 2022 21:25:14 -0600 Subject: [PATCH 02/31] add ability to load images from hdf5 files --- documentation/RELEASE_NOTES.md | 10 ++ pvapy/cli/adSimServer.py | 304 ++++++++++++++++++++++----------- 2 files changed, 219 insertions(+), 95 deletions(-) diff --git a/documentation/RELEASE_NOTES.md b/documentation/RELEASE_NOTES.md index 6e78ea3..fa8b092 100644 --- a/documentation/RELEASE_NOTES.md +++ b/documentation/RELEASE_NOTES.md @@ -1,3 +1,13 @@ +## Release 5.2.1 (2022/11/DD) + +- Fixed issue with put into PvObjectQueue when timeout is given +- Area Detector Simulator enhancements: + - added ability to load images from HDF5 files +- Conda/pip package dependencies: + - EPICS BASE = 7.0.7 + - BOOST = 1.78.0 + - NUMPY >= 1.22 (for python >= 3.8); >= 1.19, < 1.21 (for python < 3.8) + ## Release 5.2.0 (2022/11/04) - Streaming Framework enhancements: diff --git a/pvapy/cli/adSimServer.py b/pvapy/cli/adSimServer.py index 22efef0..70c57a3 100755 --- a/pvapy/cli/adSimServer.py +++ b/pvapy/cli/adSimServer.py @@ -5,10 +5,16 @@ import threading import queue import argparse -import numpy as np -import pvaccess as pva import os import os.path +import numpy as np +# HDF5 is optional +try: + import h5py as h5 +except ImportError: + h5 = None + +import pvaccess as pva from ..utility.adImageUtility import AdImageUtility from ..utility.floatWithUnits import FloatWithUnits from ..utility.intWithUnits import IntWithUnits @@ -17,6 +23,13 @@ class AdSimServer: + # Uses frame cache of a given size. If the number of input + # files is larger than the cache size, the server will be constantly + # regenerating frames. + + SHUTDOWN_DELAY = 1.0 + MIN_CACHE_SIZE = 1 + CACHE_TIMEOUT = 1.0 DELAY_CORRECTION = 0.0001 NOTIFICATION_DELAY = 0.1 BYTES_IN_MEGABYTE = 1000000 @@ -25,78 +38,55 @@ class AdSimServer: 'timeStamp' : pva.PvTimeStamp() } - def __init__(self, inputDirectory, inputFile, mmapMode, frameRate, nf, nx, ny, datatype, minimum, maximum, runtime, channelName, notifyPv, notifyPvValue, metadataPv, startDelay, reportPeriod): + def __init__(self, inputDirectory, inputFile, mmapMode, hdfDataset, frameRate, nFrames, cacheSize, nx, ny, datatype, minimum, maximum, runtime, channelName, notifyPv, notifyPvValue, metadataPv, startDelay, reportPeriod, disableCurses): + self.lock = threading.Lock() self.deltaT = 0 + self.cacheTimeout = self.CACHE_TIMEOUT if frameRate > 0: self.deltaT = 1.0/frameRate + self.cacheTimeout = max(self.CACHE_TIMEOUT, self.deltaT) self.runtime = runtime self.reportPeriod = reportPeriod - self.pvaServer = pva.PvaServer() self.metadataIoc = None + self.frameSourceList = [] + self.frameCacheSize = max(cacheSize, self.MIN_CACHE_SIZE) + self.nFrames = nFrames - self.setupMetadataPvs(metadataPv) inputFiles = [] if inputDirectory is not None: inputFiles = [os.path.join(inputDirectory, f) for f in os.listdir(inputDirectory) if os.path.isfile(os.path.join(inputDirectory, f))] if inputFile is not None: inputFiles.append(inputFile) - self.frames = None for f in inputFiles: - try: - if mmapMode: - newFrames = np.load(f, mmapMode='r') - else: - newFrames = np.load(f) - if self.frames is None: - self.frames = newFrames - else: - self.frames = np.append(self.frames, newFrames, axis=0) - print(f'Loaded input file {f}') - except Exception as ex: - print(f'Cannot load input file {f}, skipping it: {ex}') - if self.frames is None: - print('Generating random frames') - # Example frame: - # frame = np.array([[0,0,0,0,0,0,0,0,0,0], - # [0,0,0,0,1,1,0,0,0,0], - # [0,0,0,1,2,3,2,0,0,0], - # [0,0,0,1,2,3,2,0,0,0], - # [0,0,0,1,2,3,2,0,0,0], - # [0,0,0,0,0,0,0,0,0,0]], dtype=np.uint16) - dt = np.dtype(datatype) - if datatype != 'float32' and datatype != 'float64': - dtinfo = np.iinfo(dt) - mn = dtinfo.min - if minimum is not None: - mn = int(max(dtinfo.min, minimum)) - mx = dtinfo.max - if maximum is not None: - mx = int(min(dtinfo.max, maximum)) - self.frames = np.random.randint(mn, mx, size=(nf, ny, nx), dtype=dt) - else: - # Use float32 for min/max, to prevent overflow errors - dtinfo = np.finfo(np.float32) - mn = dtinfo.min - if minimum is not None: - mn = float(max(dtinfo.min, minimum)) - mx = dtinfo.max - if maximum is not None: - mx = float(min(dtinfo.max, maximum)) - self.frames = np.random.uniform(mn, mx, size=(nf, ny, nx)) - if datatype == 'float32': - self.frames = np.float32(self.frames) - - print(f'Generated frame shape: {self.frames[0].shape}') - print(f'Range of generated values: [{mn},{mx}]') + frames = self.loadInputFile(f, mmapMode=mmapMode, hdfDatasetPath=hdfDataset) + if frames is not None: + self.frameSourceList.append(frames) + + if not self.frameSourceList: + nf = nFrames + if nf <= 0: + nf = self.frameCacheSize + frames = self.generateFrames(nf, nx, ny, datatype, minimum, maximum) + self.frameSourceList.append(frames) + + self.nInputFrames = 0 + for fs in self.frameSourceList: + nInputFrames, self.rows, self.cols = fs.shape + self.nInputFrames += nInputFrames + if self.nFrames > 0: + self.nInputFrames = min(self.nFrames, self.nInputFrames) + + frames = self.frameSourceList[0] + self.frameRate = frameRate - self.nInputFrames, self.rows, self.cols = self.frames.shape - self.imageSize = IntWithUnits(self.rows*self.cols*self.frames[0].itemsize, 'B') + self.imageSize = IntWithUnits(self.rows*self.cols*frames[0].itemsize, 'B') self.expectedDataRate = FloatWithUnits(self.imageSize*self.frameRate/self.BYTES_IN_MEGABYTE, 'MBps') - print(f'Number of input frames: {self.nInputFrames} (size: {self.cols}x{self.rows}, {self.imageSize}, type: {self.frames.dtype})') - print(f'Expected data rate: {self.expectedDataRate}') self.channelName = channelName + self.pvaServer = pva.PvaServer() + self.setupMetadataPvs(metadataPv) self.pvaServer.addRecord(self.channelName, pva.NtNdArray(), None) + if notifyPv and notifyPvValue: try: time.sleep(self.NOTIFICATION_DELAY) @@ -106,7 +96,19 @@ def __init__(self, inputDirectory, inputFile, mmapMode, frameRate, nf, nx, ny, d except Exception as ex: print(f'Could not set notification PV {notifyPv} to {notifyPvValue}: {ex}') - self.frameMap = {} + # Use PvObjectQueue if cache size is too small for all input frames + # Otherwise, simple dictionary is good enough + self.usingQueue = False + if self.nInputFrames > self.frameCacheSize: + self.usingQueue = True + self.frameCache = pva.PvObjectQueue(self.frameCacheSize) + else: + self.frameCache = {} + + print(f'Number of input frames: {self.nInputFrames} (size: {self.cols}x{self.rows}, {self.imageSize}, type: {frames.dtype})') + print(f'Frame cache type: {type(self.frameCache)} (cache size: {self.frameCacheSize})') + print(f'Expected data rate: {self.expectedDataRate}') + self.currentFrameId = 0 self.nPublishedFrames = 0 self.startTime = 0 @@ -115,17 +117,82 @@ def __init__(self, inputDirectory, inputFile, mmapMode, frameRate, nf, nx, ny, d self.isDone = False self.screen = None self.screenInitialized = False + self.disableCurses = disableCurses def setupCurses(self): screen = None - try: - import curses - screen = curses.initscr() - self.curses = curses - except ImportError as ex: - pass + if not self.disableCurses: + try: + import curses + screen = curses.initscr() + self.curses = curses + except ImportError as ex: + pass return screen + def loadInputFile(self, filePath, mmapMode=False, hdfDatasetPath=None): + try: + frames = None + allowedHdfExtensions = ['.h5', '.hdf', '.hdf5'] + isHdf = False + for ext in allowedHdfExtensions: + if filePath.endswith(ext): + isHdf = True + break + if isHdf: + if not h5: + raise Exception(f'Missing HDF support.') + if not hdfDatasetPath: + raise Exception(f'Missing HDF dataset specification for input file {filePath}.') + hdfFile = h5.File(filePath, 'r') + hdfDataset = hdfFile[hdfDatasetPath] + frames = hdfDataset + else: + if mmapMode: + frames = np.load(filePath, mmapMode='r') + else: + frames = np.load(filePath) + print(f'Loaded input file {filePath}') + except Exception as ex: + print(f'Cannot load input file {filePath}, skipping it: {ex}') + return frames + + def generateFrames(self, nf, nx, ny, datatype, minimum, maximum): + print('Generating random frames') + # Example frame: + # frame = np.array([[0,0,0,0,0,0,0,0,0,0], + # [0,0,0,0,1,1,0,0,0,0], + # [0,0,0,1,2,3,2,0,0,0], + # [0,0,0,1,2,3,2,0,0,0], + # [0,0,0,1,2,3,2,0,0,0], + # [0,0,0,0,0,0,0,0,0,0]], dtype=np.uint16) + dt = np.dtype(datatype) + if datatype != 'float32' and datatype != 'float64': + dtinfo = np.iinfo(dt) + mn = dtinfo.min + if minimum is not None: + mn = int(max(dtinfo.min, minimum)) + mx = dtinfo.max + if maximum is not None: + mx = int(min(dtinfo.max, maximum)) + frames = np.random.randint(mn, mx, size=(nf, ny, nx), dtype=dt) + else: + # Use float32 for min/max, to prevent overflow errors + dtinfo = np.finfo(np.float32) + mn = dtinfo.min + if minimum is not None: + mn = float(max(dtinfo.min, minimum)) + mx = dtinfo.max + if maximum is not None: + mx = float(min(dtinfo.max, maximum)) + frames = np.random.uniform(mn, mx, size=(nf, ny, nx)) + if datatype == 'float32': + frames = np.float32(frames) + + print(f'Generated frame shape: {frames[0].shape}') + print(f'Range of generated values: [{mn},{mx}]') + return frames + def setupMetadataPvs(self, metadataPv): self.caMetadataPvs = [] self.pvaMetadataPvs = [] @@ -193,21 +260,55 @@ def updateMetadataPvs(self, metadataValueDict): self.pvaServer.update(mPv, mPvObject) return t - def frameProducer(self, extraFieldsPvObject=None): - for frameId in range(0, self.nInputFrames): - if self.isDone: - return + def addFrameToCache(self, frameId, ntnda): + if not self.usingQueue: + # Using dictionary + self.frameCache[frameId] = ntnda + else: + # Using PvObjectQueue + try: + self.frameCache.put(ntnda, self.cacheTimeout) + except pva.QueueFull: + pass + + def getFrameFromCache(self): + if not self.usingQueue: + # Using dictionary + cachedFrameId = self.currentFrameId % self.nInputFrames + if cachedFrameId not in self.frameCache: + # In case frames were not generated on time, just use first frame + cachedFrameId = 0 + ntnda = self.frameCache[cachedFrameId] + else: + # Using PvObjectQueue + ntnda = self.frameCache.get(self.cacheTimeout) + return ntnda - frame = self.frames[frameId] - self.frameMap[frameId] = AdImageUtility.generateNtNdArray2D(frameId, frame, extraFieldsPvObject) + def frameProducer(self, extraFieldsPvObject=None): + startTime = time.time() + frameId = 0 + while not self.isDone: + for frames in self.frameSourceList: + for frame in frames: + if self.isDone or (self.nFrames > 0 and frameId >= self.nFrames): + break + ntnda = AdImageUtility.generateNtNdArray2D(frameId, frame, extraFieldsPvObject) + self.addFrameToCache(frameId, ntnda) + frameId += 1 + if self.reportPeriod > 0 and (frameId % self.reportPeriod) == 0: + now = time.time() + genTime = now - startTime + genRate = frameId/genTime + report = 'Generated frame id {:6d} @ {:.3f}s (gen rate: {:.4f}fps; gen time: {:.3f}s)'.format(frameId, now, genRate, genTime) + self.printReport(report) + if not self.usingQueue: + # All frames in cache + break + self.printReport(f'Frame producer thread is done after {frameId} generated frames') def prepareFrame(self, t=0): # Get cached frame - cachedFrameId = self.currentFrameId % self.nInputFrames - if cachedFrameId not in self.frameMap: - # In case frames were not generated on time, use first frame - cachedFrameId = 0 - frame = self.frameMap[cachedFrameId] + frame = self.getFrameFromCache() # Correct image id and timestamps self.currentFrameId += 1 @@ -232,7 +333,12 @@ def framePublisher(self): # Prepare frame with a given timestamp # so that metadata and image times are as close as possible - frame = self.prepareFrame(updateTime) + try: + frame = self.prepareFrame(updateTime) + except Exception: + if self.isDone: + return + raise # Publish frame self.pvaServer.update(self.channelName, frame) @@ -248,22 +354,11 @@ def framePublisher(self): else: self.startTime = self.lastPublishedTime if self.reportPeriod > 0 and (self.nPublishedFrames % self.reportPeriod) == 0: - report = 'Published frame id {:6d} @ {:.3f}s (frame rate: {:.4f}fps; runtime: {:.3f}s)'.format(self.currentFrameId, self.lastPublishedTime, frameRate, runtime) - if not self.screenInitialized: - self.screenInitialized = True - self.screen = self.setupCurses() - - if self.screen: - self.screen.erase() - self.screen.addstr(f'{report}\n') - self.screen.refresh() - else: - print(report) + report = 'Published frame id {:6d} @ {:.3f}s (pub rate: {:.4f}fps; pub time: {:.3f}s)'.format(self.currentFrameId, self.lastPublishedTime, frameRate, runtime) + self.printReport(report) if runtime > self.runtime: - if self.screen: - self.curses.endwin() - print('Server exiting after reaching runtime of {:.3f} seconds'.format(self.runtime)) + self.printReport(f'Server exiting after reaching runtime of {runtime:.3f} seconds') return if self.deltaT > 0: @@ -273,6 +368,18 @@ def framePublisher(self): threading.Timer(delay, self.framePublisher).start() return + def printReport(self, report): + with self.lock: + if not self.screenInitialized: + self.screenInitialized = True + self.screen = self.setupCurses() + if self.screen: + self.screen.erase() + self.screen.addstr(f'{report}\n') + self.screen.refresh() + else: + print(report) + def start(self): threading.Thread(target=self.frameProducer, daemon=True).start() self.pvaServer.start() @@ -282,9 +389,13 @@ def stop(self): self.isDone = True self.pvaServer.stop() runtime = self.lastPublishedTime - self.startTime - deltaT = runtime/(self.nPublishedFrames - 1) - frameRate = 1.0/deltaT + deltaT = 0 + frameRate = 0 + if self.nPublishedFrames > 1: + deltaT = runtime/(self.nPublishedFrames - 1) + frameRate = 1.0/deltaT dataRate = FloatWithUnits(self.imageSize*frameRate/self.BYTES_IN_MEGABYTE, 'MBps') + time.sleep(self.SHUTDOWN_DELAY) if self.screen: self.curses.endwin() print('\nServer runtime: {:.4f} seconds'.format(runtime)) @@ -297,13 +408,15 @@ def main(): parser.add_argument('-id', '--input-directory', type=str, dest='input_directory', default=None, help='Directory containing input files to be streamed; if input directory or input file are not provided, random images will be generated') parser.add_argument('-if', '--input-file', type=str, dest='input_file', default=None, help='Input file to be streamed; if input directory or input file are not provided, random images will be generated') parser.add_argument('-mm', '--mmap-mode', action='store_true', dest='mmap_mode', default=False, help='Use NumPy memory map to load the specified input file. This flag typically results in faster startup and lower memory usage for large files.') + parser.add_argument('-hds', '--hdf-dataset', dest='hdf_dataset', default=None, help='HDF5 dataset path. This option must be specified if HDF5 files are used as input, but otherwise it is ignored.') parser.add_argument('-fps', '--frame-rate', type=float, dest='frame_rate', default=20, help='Frames per second (default: 20 fps)') parser.add_argument('-nx', '--n-x-pixels', type=int, dest='n_x_pixels', default=256, help='Number of pixels in x dimension (default: 256 pixels; does not apply if input file file is given)') parser.add_argument('-ny', '--n-y-pixels', type=int, dest='n_y_pixels', default=256, help='Number of pixels in x dimension (default: 256 pixels; does not apply if input file is given)') parser.add_argument('-dt', '--datatype', type=str, dest='datatype', default='uint8', help='Generated datatype. Possible options are int8, uint8, int16, uint16, int32, uint32, float32, float64 (default: uint8; does not apply if input file is given)') parser.add_argument('-mn', '--minimum', type=float, dest='minimum', default=None, help='Minimum generated value (does not apply if input file is given)') parser.add_argument('-mx', '--maximum', type=float, dest='maximum', default=None, help='Maximum generated value (does not apply if input file is given)') - parser.add_argument('-nf', '--n-frames', type=int, dest='n_frames', default=1000, help='Number of different frames to generate and cache; those images will be published over and over again as long as the server is running') + parser.add_argument('-nf', '--n-frames', type=int, dest='n_frames', default=0, help='Number of different frames generate from the input sources; if set to <= 0, the server will use all images found in input files, or it will generate enough images to fill up the image cache if no input files were specified.') + parser.add_argument('-cs', '--cache-size', type=int, dest='cache_size', default=1000, help='Number of different frames to cache (default: 1000); if the cache size is smaller than the number of input frames, the new frames will be constantly regenerated as cached ones are published; otherwise, cached frames will be published over and over again as long as the server is running') parser.add_argument('-rt', '--runtime', type=float, dest='runtime', default=300, help='Server runtime in seconds (default: 300 seconds)') parser.add_argument('-cn', '--channel-name', type=str, dest='channel_name', default='pvapy:image', help='Server PVA channel name (default: pvapy:image)') parser.add_argument('-npv', '--notify-pv', type=str, dest='notify_pv', default=None, help='CA channel that should be notified on start; for the default Area Detector PVA driver PV that controls image acquisition is 13PVA1:cam1:Acquire') @@ -311,18 +424,19 @@ def main(): parser.add_argument('-mpv', '--metadata-pv', type=str, dest='metadata_pv', default=None, help='Comma-separated list of CA channels that should be contain simulated image metadata values') parser.add_argument('-sd', '--start-delay', type=float, dest='start_delay', default=10.0, help='Server start delay in seconds (default: 10 seconds)') parser.add_argument('-rp', '--report-period', type=int, dest='report_period', default=1, help='Reporting period for publishing frames; if set to <=0 no frames will be reported as published (default: 1)') + parser.add_argument('-dc', '--disable-curses', dest='disable_curses', default=False, action='store_true', help='Disable curses library screen handling. This is enabled by default, except when logging into standard output is turned on.') args, unparsed = parser.parse_known_args() if len(unparsed) > 0: print('Unrecognized argument(s): %s' % ' '.join(unparsed)) exit(1) - server = AdSimServer(inputDirectory=args.input_directory, inputFile=args.input_file, mmapMode=args.mmap_mode, frameRate=args.frame_rate, nf=args.n_frames, nx=args.n_x_pixels, ny=args.n_y_pixels, datatype=args.datatype, minimum=args.minimum, maximum=args.maximum, runtime=args.runtime, channelName=args.channel_name, notifyPv=args.notify_pv, notifyPvValue=args.notify_pv_value, metadataPv=args.metadata_pv, startDelay=args.start_delay, reportPeriod=args.report_period) + server = AdSimServer(inputDirectory=args.input_directory, inputFile=args.input_file, mmapMode=args.mmap_mode, hdfDataset=args.hdf_dataset, frameRate=args.frame_rate, nFrames=args.n_frames, cacheSize=args.cache_size, nx=args.n_x_pixels, ny=args.n_y_pixels, datatype=args.datatype, minimum=args.minimum, maximum=args.maximum, runtime=args.runtime, channelName=args.channel_name, notifyPv=args.notify_pv, notifyPvValue=args.notify_pv_value, metadataPv=args.metadata_pv, startDelay=args.start_delay, reportPeriod=args.report_period, disableCurses=args.disable_curses) server.start() try: - runtime = args.runtime + 2*args.start_delay - time.sleep(runtime) + waitTime = args.runtime+args.start_delay+server.SHUTDOWN_DELAY + time.sleep(waitTime) except KeyboardInterrupt as ex: pass server.stop() From 5f8081ef8eaa7eda317cda4d87d6fd9e8d4baf68 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Fri, 11 Nov 2022 21:56:18 -0600 Subject: [PATCH 03/31] do not report generated frames after publishing starts --- pvapy/cli/adSimServer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pvapy/cli/adSimServer.py b/pvapy/cli/adSimServer.py index 70c57a3..e828588 100755 --- a/pvapy/cli/adSimServer.py +++ b/pvapy/cli/adSimServer.py @@ -295,7 +295,7 @@ def frameProducer(self, extraFieldsPvObject=None): ntnda = AdImageUtility.generateNtNdArray2D(frameId, frame, extraFieldsPvObject) self.addFrameToCache(frameId, ntnda) frameId += 1 - if self.reportPeriod > 0 and (frameId % self.reportPeriod) == 0: + if not self.nPublishedFrames and self.reportPeriod > 0 and (frameId % self.reportPeriod) == 0: now = time.time() genTime = now - startTime genRate = frameId/genTime From f9f657f57bd6d2387a30d3ffea7f34afa277f171 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Fri, 11 Nov 2022 22:17:21 -0600 Subject: [PATCH 04/31] simplify frame producer --- pvapy/cli/adSimServer.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/pvapy/cli/adSimServer.py b/pvapy/cli/adSimServer.py index e828588..a4e547a 100755 --- a/pvapy/cli/adSimServer.py +++ b/pvapy/cli/adSimServer.py @@ -295,16 +295,10 @@ def frameProducer(self, extraFieldsPvObject=None): ntnda = AdImageUtility.generateNtNdArray2D(frameId, frame, extraFieldsPvObject) self.addFrameToCache(frameId, ntnda) frameId += 1 - if not self.nPublishedFrames and self.reportPeriod > 0 and (frameId % self.reportPeriod) == 0: - now = time.time() - genTime = now - startTime - genRate = frameId/genTime - report = 'Generated frame id {:6d} @ {:.3f}s (gen rate: {:.4f}fps; gen time: {:.3f}s)'.format(frameId, now, genRate, genTime) - self.printReport(report) if not self.usingQueue: - # All frames in cache + # All frames are in cache break - self.printReport(f'Frame producer thread is done after {frameId} generated frames') + self.printReport(f'Frame producer is done after {frameId} generated frames') def prepareFrame(self, t=0): # Get cached frame From bdd290f07bb7397c6b8ef3a97bc8492f5e815ae8 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Sat, 12 Nov 2022 08:50:44 -0600 Subject: [PATCH 05/31] restore old output --- pvapy/cli/adSimServer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pvapy/cli/adSimServer.py b/pvapy/cli/adSimServer.py index a4e547a..134b269 100755 --- a/pvapy/cli/adSimServer.py +++ b/pvapy/cli/adSimServer.py @@ -348,7 +348,7 @@ def framePublisher(self): else: self.startTime = self.lastPublishedTime if self.reportPeriod > 0 and (self.nPublishedFrames % self.reportPeriod) == 0: - report = 'Published frame id {:6d} @ {:.3f}s (pub rate: {:.4f}fps; pub time: {:.3f}s)'.format(self.currentFrameId, self.lastPublishedTime, frameRate, runtime) + report = 'Published frame id {:6d} @ {:.3f}s (frame rate: {:.4f}fps; runtime: {:.3f}s)'.format(self.currentFrameId, self.lastPublishedTime, frameRate, runtime) self.printReport(report) if runtime > self.runtime: From e0bfc369794b77ea38e15c9eeaa2717bbab9a5da Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Mon, 14 Nov 2022 08:29:45 -0600 Subject: [PATCH 06/31] updated default value for userTag field to zero; added one more constructor to PvTimeStamp() class --- documentation/RELEASE_NOTES.md | 1 + src/pvaccess/PvTimeStamp.cpp | 12 +++++++++++- src/pvaccess/PvTimeStamp.h | 1 + src/pvaccess/pvaccess.PvTimeStamp.cpp | 6 ++++-- 4 files changed, 17 insertions(+), 3 deletions(-) diff --git a/documentation/RELEASE_NOTES.md b/documentation/RELEASE_NOTES.md index fa8b092..0c8d12e 100644 --- a/documentation/RELEASE_NOTES.md +++ b/documentation/RELEASE_NOTES.md @@ -1,6 +1,7 @@ ## Release 5.2.1 (2022/11/DD) - Fixed issue with put into PvObjectQueue when timeout is given +- Updated default value for PvTimeStamp userTag field to 0 - Area Detector Simulator enhancements: - added ability to load images from HDF5 files - Conda/pip package dependencies: diff --git a/src/pvaccess/PvTimeStamp.cpp b/src/pvaccess/PvTimeStamp.cpp index 3e334cc..9ea6277 100644 --- a/src/pvaccess/PvTimeStamp.cpp +++ b/src/pvaccess/PvTimeStamp.cpp @@ -11,7 +11,7 @@ const char* PvTimeStamp::SecondsPastEpochFieldKey("secondsPastEpoch"); const char* PvTimeStamp::NanosecondsFieldKey("nanoseconds"); const char* PvTimeStamp::UserTagFieldKey("userTag"); -const int PvTimeStamp::UnknownUserTag(-1); +const int PvTimeStamp::UnknownUserTag(0); boost::python::dict PvTimeStamp::createStructureDict() { @@ -48,6 +48,16 @@ PvTimeStamp::PvTimeStamp(double time) : setUserTag(UnknownUserTag); } +PvTimeStamp::PvTimeStamp(double int userTag) : + PvObject(createStructureDict(), StructureId) +{ + long long secondsPastEpoch(time); + int nanoseconds((time-secondsPastEpoch)*NanosecondsInSecond); + setSecondsPastEpoch(secondsPastEpoch); + setNanoseconds(nanoseconds); + setUserTag(userTag); +} + PvTimeStamp::PvTimeStamp(const epicsTimeStamp& ts) : PvObject(createStructureDict(), StructureId) { diff --git a/src/pvaccess/PvTimeStamp.h b/src/pvaccess/PvTimeStamp.h index 0344233..06ce043 100644 --- a/src/pvaccess/PvTimeStamp.h +++ b/src/pvaccess/PvTimeStamp.h @@ -29,6 +29,7 @@ class PvTimeStamp : public PvObject // Instance methods PvTimeStamp(); PvTimeStamp(double time); + PvTimeStamp(double time, int userTag); PvTimeStamp(const epicsTimeStamp* ts); PvTimeStamp(const epicsTimeStamp& ts); PvTimeStamp(long long secondsPastEpoch, int nanoseconds); diff --git a/src/pvaccess/pvaccess.PvTimeStamp.cpp b/src/pvaccess/pvaccess.PvTimeStamp.cpp index c52fd9c..b74218a 100644 --- a/src/pvaccess/pvaccess.PvTimeStamp.cpp +++ b/src/pvaccess/pvaccess.PvTimeStamp.cpp @@ -20,8 +20,8 @@ class_ >("PvTimeStamp", "**PvTimeStamp(time)**\n\n" "\t:Parameter: *time* (float) - time represented as float, including seconds and fractional seconds\n\n" "\t::\n\n" - "\t\ttimeStamp2 = PvTimeStamp(1234567890.00123)\n\n" - "**PvTimeStamp(secondsPastEpoch, nanoseconds [, userTag=-1])**\n\n" + "\t\ttimeStamp2 = PvTimeStamp(1234567890.00123 [, userTag=0])\n\n" + "**PvTimeStamp(secondsPastEpoch, nanoseconds [, userTag=0])**\n\n" "\t:Parameter: *secondsPastEpoch* (long) - seconds past epoch\n\n" "\t:Parameter: *nanoseconds* (int) - nanoseconds\n\n" "\t:Parameter: *userTag* (int) - user tag\n\n" @@ -32,6 +32,8 @@ class_ >("PvTimeStamp", .def(init()) + .def(init()) + .def(init()) .def(init()) From 5b4544c71d06ddc8f86fd256489da50002ee833a Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Mon, 14 Nov 2022 08:57:39 -0600 Subject: [PATCH 07/31] fixed typo --- src/pvaccess/PvTimeStamp.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pvaccess/PvTimeStamp.cpp b/src/pvaccess/PvTimeStamp.cpp index 9ea6277..66c1783 100644 --- a/src/pvaccess/PvTimeStamp.cpp +++ b/src/pvaccess/PvTimeStamp.cpp @@ -48,7 +48,7 @@ PvTimeStamp::PvTimeStamp(double time) : setUserTag(UnknownUserTag); } -PvTimeStamp::PvTimeStamp(double int userTag) : +PvTimeStamp::PvTimeStamp(double time, int userTag) : PvObject(createStructureDict(), StructureId) { long long secondsPastEpoch(time); From 24eaeaeaf6d238fdf092d3c11f90d0a5e05baca7 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Mon, 14 Nov 2022 09:29:41 -0600 Subject: [PATCH 08/31] updated doc string --- pvapy/hpc/userDataProcessor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pvapy/hpc/userDataProcessor.py b/pvapy/hpc/userDataProcessor.py index fb0e0ed..9f5bd18 100644 --- a/pvapy/hpc/userDataProcessor.py +++ b/pvapy/hpc/userDataProcessor.py @@ -11,6 +11,7 @@ class UserDataProcessor: The following variables will be set after processor instance is created and before processing starts:\n \t\- *logger* (logging.Logger) : logger object\n \t\- *processorId* (int) : processor id\n + \t\- *inputChannel* (str) : input channel\n \t\- *outputChannel* (str) : output channel\n \t\- *objectIdField* (str) : name of the object id field\n \t\- *pvaServer* (PvaServer) : PVA Server instance for publishing output objects\n From 3afc3f2185027515e19abc7ace6c71d72f749c9a Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Wed, 23 Nov 2022 16:04:35 -0600 Subject: [PATCH 09/31] add ability to stream compressed data from hdf5 --- documentation/RELEASE_NOTES.md | 2 +- pvapy/cli/adSimServer.py | 265 ++++++++++++++++++++++---------- pvapy/utility/adImageUtility.py | 55 ++++--- 3 files changed, 213 insertions(+), 109 deletions(-) diff --git a/documentation/RELEASE_NOTES.md b/documentation/RELEASE_NOTES.md index 0c8d12e..02e0b0d 100644 --- a/documentation/RELEASE_NOTES.md +++ b/documentation/RELEASE_NOTES.md @@ -3,7 +3,7 @@ - Fixed issue with put into PvObjectQueue when timeout is given - Updated default value for PvTimeStamp userTag field to 0 - Area Detector Simulator enhancements: - - added ability to load images from HDF5 files + - added ability to load images from HDF5 files (either compressed or uncompressed) - Conda/pip package dependencies: - EPICS BASE = 7.0.7 - BOOST = 1.78.0 diff --git a/pvapy/cli/adSimServer.py b/pvapy/cli/adSimServer.py index 134b269..b686f80 100755 --- a/pvapy/cli/adSimServer.py +++ b/pvapy/cli/adSimServer.py @@ -13,6 +13,10 @@ import h5py as h5 except ImportError: h5 = None +try: + import hdf5plugin +except ImportError: + pass import pvaccess as pva from ..utility.adImageUtility import AdImageUtility @@ -21,6 +25,156 @@ __version__ = pva.__version__ +class FrameGenerator: + def __init__(self): + self.frames = None + self.nInputFrames = 0 + self.rows = 0 + self.cols = 0 + self.dtype = None + self.compressorName = None + + def getFrameData(self, frameId): + if frameId < self.nInputFrames and frameId >= 0: + return self.frames[frameId] + return None + + def getFrameInfo(self): + if self.frames is not None and not self.nInputFrames: + self.nInputFrames, self.rows, self.cols = self.frames.shape + self.dtype = self.frames.dtype + return (self.nInputFrames, self.rows, self.cols, self.dtype, self.compressorName) + + def getUncompressedFrameSize(self): + return self.rows*self.cols*self.frames[0].itemsize + + def getCompressedFrameSize(self): + if self.compressorName: + return len(self.getFrameData(0)) + else: + return self.getUncompressedFrameSize() + + def getCompressorName(self): + return self.compressorName + +class HdfFileGenerator(FrameGenerator): + + COMPRESSOR_NAME_MAP = { + '32001' : 'blosc' + } + + def __init__(self, filePath, datasetPath, compressionMode=False): + FrameGenerator.__init__(self) + self.filePath = filePath + self.datasetPath = datasetPath + self.dataset = None + self.compressionMode = compressionMode + if not h5: + raise Exception(f'Missing HDF support.') + if not filePath: + raise Exception(f'Invalid input file path.') + if not datasetPath: + raise Exception(f'Missing HDF dataset specification for input file {filePath}.') + self.loadInputFile() + + def loadInputFile(self): + try: + self.file = h5.File(self.filePath, 'r') + self.dataset = self.file[self.datasetPath] + self.frames = self.dataset + if self.compressionMode: + for id,params in self.dataset._filters.items(): + compressorName = self.COMPRESSOR_NAME_MAP.get(id) + if compressorName: + self.compressorName = compressorName + break + print(f'Loaded input file {self.filePath} (compressor: {self.compressorName})') + except Exception as ex: + print(f'Cannot load input file {self.filePath}: {ex}') + raise + + def getFrameData(self, frameId): + frameData = None + if frameId < self.nInputFrames and frameId >= 0: + if not self.compressorName: + # Read uncompressed data + frameData = self.frames[frameId] + else: + # Read compressed data directly into numpy array + data = self.dataset.id.read_direct_chunk((frameId,0,0)) + frameData = np.frombuffer(data[1], dtype=np.uint8) + return frameData + +class NumpyFileGenerator(FrameGenerator): + + def __init__(self, filePath, mmapMode): + FrameGenerator.__init__(self) + self.filePath = filePath + self.mmapMode = mmapMode + if not filePath: + raise Exception(f'Invalid input file path.') + self.loadInputFile() + + def loadInputFile(self): + try: + if self.mmapMode: + self.frames = np.load(self.filePath, mmapMode='r') + else: + self.frames = np.load(self.filePath) + print(f'Loaded input file {self.filePath}') + except Exception as ex: + print(f'Cannot load input file {self.filePath}: {ex}') + raise + +class NumpyRandomGenerator(FrameGenerator): + + def __init__(self, nf, nx, ny, datatype, minimum, maximum): + FrameGenerator.__init__(self) + self.nf = nf + self.nx = nx + self.ny = ny + self.datatype = datatype + self.minimum = minimum + self.maximum = maximum + self.generateFrames() + + def generateFrames(self): + print('Generating random frames') + + # Example frame: + # frame = np.array([[0,0,0,0,0,0,0,0,0,0], + # [0,0,0,0,1,1,0,0,0,0], + # [0,0,0,1,2,3,2,0,0,0], + # [0,0,0,1,2,3,2,0,0,0], + # [0,0,0,1,2,3,2,0,0,0], + # [0,0,0,0,0,0,0,0,0,0]], dtype=np.uint16) + + dt = np.dtype(self.datatype) + if not self.datatype.startswith('float'): + dtinfo = np.iinfo(dt) + mn = dtinfo.min + if self.minimum is not None: + mn = int(max(dtinfo.min, self.minimum)) + mx = dtinfo.max + if self.maximum is not None: + mx = int(min(dtinfo.max, self.maximum)) + self.frames = np.random.randint(mn, mx, size=(self.nf, self.ny, self.nx), dtype=dt) + else: + # Use float32 for min/max, to prevent overflow errors + dtinfo = np.finfo(np.float32) + mn = dtinfo.min + if self.minimum is not None: + mn = float(max(dtinfo.min, self.minimum)) + mx = dtinfo.max + if self.maximum is not None: + mx = float(min(dtinfo.max, self.maximum)) + self.frames = np.random.uniform(mn, mx, size=(self.nf, self.ny, self.nx)) + if datatype == 'float32': + self.frames = np.float32(self.frames) + + print(f'Generated frame shape: {self.frames[0].shape}') + print(f'Range of generated values: [{mn},{mx}]') + class AdSimServer: # Uses frame cache of a given size. If the number of input @@ -38,7 +192,7 @@ class AdSimServer: 'timeStamp' : pva.PvTimeStamp() } - def __init__(self, inputDirectory, inputFile, mmapMode, hdfDataset, frameRate, nFrames, cacheSize, nx, ny, datatype, minimum, maximum, runtime, channelName, notifyPv, notifyPvValue, metadataPv, startDelay, reportPeriod, disableCurses): + def __init__(self, inputDirectory, inputFile, mmapMode, hdfDataset, hdfCompressionMode, frameRate, nFrames, cacheSize, nx, ny, datatype, minimum, maximum, runtime, channelName, notifyPv, notifyPvValue, metadataPv, startDelay, reportPeriod, disableCurses): self.lock = threading.Lock() self.deltaT = 0 self.cacheTimeout = self.CACHE_TIMEOUT @@ -48,7 +202,7 @@ def __init__(self, inputDirectory, inputFile, mmapMode, hdfDataset, frameRate, n self.runtime = runtime self.reportPeriod = reportPeriod self.metadataIoc = None - self.frameSourceList = [] + self.frameGeneratorList = [] self.frameCacheSize = max(cacheSize, self.MIN_CACHE_SIZE) self.nFrames = nFrames @@ -57,30 +211,33 @@ def __init__(self, inputDirectory, inputFile, mmapMode, hdfDataset, frameRate, n inputFiles = [os.path.join(inputDirectory, f) for f in os.listdir(inputDirectory) if os.path.isfile(os.path.join(inputDirectory, f))] if inputFile is not None: inputFiles.append(inputFile) + allowedHdfExtensions = ['h5', 'hdf', 'hdf5'] for f in inputFiles: - frames = self.loadInputFile(f, mmapMode=mmapMode, hdfDatasetPath=hdfDataset) - if frames is not None: - self.frameSourceList.append(frames) + ext = f.split('.')[-1] + if ext in allowedHdfExtensions: + self.frameGeneratorList.append(HdfFileGenerator(f, hdfDataset, hdfCompressionMode)) + else: + self.frameGeneratorList.append(NumpyFileGenerator(f, mmapMode)) - if not self.frameSourceList: + if not self.frameGeneratorList: nf = nFrames if nf <= 0: nf = self.frameCacheSize - frames = self.generateFrames(nf, nx, ny, datatype, minimum, maximum) - self.frameSourceList.append(frames) + self.frameGeneratorList.append(NumpyRandomGenerator(nf, nx, ny, datatype, minimum, maximum)) self.nInputFrames = 0 - for fs in self.frameSourceList: - nInputFrames, self.rows, self.cols = fs.shape + for fg in self.frameGeneratorList: + nInputFrames, self.rows, self.cols, self.dtype, self.compressorName = fg.getFrameInfo() self.nInputFrames += nInputFrames if self.nFrames > 0: self.nInputFrames = min(self.nFrames, self.nInputFrames) - frames = self.frameSourceList[0] - + fg = self.frameGeneratorList[0] self.frameRate = frameRate - self.imageSize = IntWithUnits(self.rows*self.cols*frames[0].itemsize, 'B') - self.expectedDataRate = FloatWithUnits(self.imageSize*self.frameRate/self.BYTES_IN_MEGABYTE, 'MBps') + self.uncompressedImageSize = IntWithUnits(fg.getUncompressedFrameSize(), 'B') + self.compressedImageSize = IntWithUnits(fg.getCompressedFrameSize(), 'B') + self.compressedDataRate = FloatWithUnits(self.compressedImageSize*self.frameRate/self.BYTES_IN_MEGABYTE, 'MBps') + self.uncompressedDataRate = FloatWithUnits(self.uncompressedImageSize*self.frameRate/self.BYTES_IN_MEGABYTE, 'MBps') self.channelName = channelName self.pvaServer = pva.PvaServer() @@ -105,9 +262,9 @@ def __init__(self, inputDirectory, inputFile, mmapMode, hdfDataset, frameRate, n else: self.frameCache = {} - print(f'Number of input frames: {self.nInputFrames} (size: {self.cols}x{self.rows}, {self.imageSize}, type: {frames.dtype})') + print(f'Number of input frames: {self.nInputFrames} (size: {self.cols}x{self.rows}, {self.uncompressedImageSize}, type: {self.dtype}, compressor: {self.compressorName}, compressed size: {self.compressedImageSize})') print(f'Frame cache type: {type(self.frameCache)} (cache size: {self.frameCacheSize})') - print(f'Expected data rate: {self.expectedDataRate}') + print(f'Expected data rate: {self.compressedDataRate} (uncompressed: {self.uncompressedDataRate})') self.currentFrameId = 0 self.nPublishedFrames = 0 @@ -130,69 +287,6 @@ def setupCurses(self): pass return screen - def loadInputFile(self, filePath, mmapMode=False, hdfDatasetPath=None): - try: - frames = None - allowedHdfExtensions = ['.h5', '.hdf', '.hdf5'] - isHdf = False - for ext in allowedHdfExtensions: - if filePath.endswith(ext): - isHdf = True - break - if isHdf: - if not h5: - raise Exception(f'Missing HDF support.') - if not hdfDatasetPath: - raise Exception(f'Missing HDF dataset specification for input file {filePath}.') - hdfFile = h5.File(filePath, 'r') - hdfDataset = hdfFile[hdfDatasetPath] - frames = hdfDataset - else: - if mmapMode: - frames = np.load(filePath, mmapMode='r') - else: - frames = np.load(filePath) - print(f'Loaded input file {filePath}') - except Exception as ex: - print(f'Cannot load input file {filePath}, skipping it: {ex}') - return frames - - def generateFrames(self, nf, nx, ny, datatype, minimum, maximum): - print('Generating random frames') - # Example frame: - # frame = np.array([[0,0,0,0,0,0,0,0,0,0], - # [0,0,0,0,1,1,0,0,0,0], - # [0,0,0,1,2,3,2,0,0,0], - # [0,0,0,1,2,3,2,0,0,0], - # [0,0,0,1,2,3,2,0,0,0], - # [0,0,0,0,0,0,0,0,0,0]], dtype=np.uint16) - dt = np.dtype(datatype) - if datatype != 'float32' and datatype != 'float64': - dtinfo = np.iinfo(dt) - mn = dtinfo.min - if minimum is not None: - mn = int(max(dtinfo.min, minimum)) - mx = dtinfo.max - if maximum is not None: - mx = int(min(dtinfo.max, maximum)) - frames = np.random.randint(mn, mx, size=(nf, ny, nx), dtype=dt) - else: - # Use float32 for min/max, to prevent overflow errors - dtinfo = np.finfo(np.float32) - mn = dtinfo.min - if minimum is not None: - mn = float(max(dtinfo.min, minimum)) - mx = dtinfo.max - if maximum is not None: - mx = float(min(dtinfo.max, maximum)) - frames = np.random.uniform(mn, mx, size=(nf, ny, nx)) - if datatype == 'float32': - frames = np.float32(frames) - - print(f'Generated frame shape: {frames[0].shape}') - print(f'Range of generated values: [{mn},{mx}]') - return frames - def setupMetadataPvs(self, metadataPv): self.caMetadataPvs = [] self.pvaMetadataPvs = [] @@ -288,11 +382,13 @@ def frameProducer(self, extraFieldsPvObject=None): startTime = time.time() frameId = 0 while not self.isDone: - for frames in self.frameSourceList: - for frame in frames: + for fg in self.frameGeneratorList: + nInputFrames, ny, nx, dtype, compressorName = fg.getFrameInfo() + for fgFrameId in range(0,nInputFrames): if self.isDone or (self.nFrames > 0 and frameId >= self.nFrames): break - ntnda = AdImageUtility.generateNtNdArray2D(frameId, frame, extraFieldsPvObject) + frameData = fg.getFrameData(fgFrameId) + ntnda = AdImageUtility.generateNtNdArray2D(frameId, frameData, nx, ny, dtype, compressorName, extraFieldsPvObject) self.addFrameToCache(frameId, ntnda) frameId += 1 if not self.usingQueue: @@ -388,7 +484,7 @@ def stop(self): if self.nPublishedFrames > 1: deltaT = runtime/(self.nPublishedFrames - 1) frameRate = 1.0/deltaT - dataRate = FloatWithUnits(self.imageSize*frameRate/self.BYTES_IN_MEGABYTE, 'MBps') + dataRate = FloatWithUnits(self.uncompressedImageSize*frameRate/self.BYTES_IN_MEGABYTE, 'MBps') time.sleep(self.SHUTDOWN_DELAY) if self.screen: self.curses.endwin() @@ -403,6 +499,7 @@ def main(): parser.add_argument('-if', '--input-file', type=str, dest='input_file', default=None, help='Input file to be streamed; if input directory or input file are not provided, random images will be generated') parser.add_argument('-mm', '--mmap-mode', action='store_true', dest='mmap_mode', default=False, help='Use NumPy memory map to load the specified input file. This flag typically results in faster startup and lower memory usage for large files.') parser.add_argument('-hds', '--hdf-dataset', dest='hdf_dataset', default=None, help='HDF5 dataset path. This option must be specified if HDF5 files are used as input, but otherwise it is ignored.') + parser.add_argument('-hcm', '--hdf-compression-mode', dest='hdf_compression_mode', default=False, action='store_true', help='Use compressed data from HDF5 file. By default, data will be uncompressed before streaming it.') parser.add_argument('-fps', '--frame-rate', type=float, dest='frame_rate', default=20, help='Frames per second (default: 20 fps)') parser.add_argument('-nx', '--n-x-pixels', type=int, dest='n_x_pixels', default=256, help='Number of pixels in x dimension (default: 256 pixels; does not apply if input file file is given)') parser.add_argument('-ny', '--n-y-pixels', type=int, dest='n_y_pixels', default=256, help='Number of pixels in x dimension (default: 256 pixels; does not apply if input file is given)') @@ -425,7 +522,7 @@ def main(): print('Unrecognized argument(s): %s' % ' '.join(unparsed)) exit(1) - server = AdSimServer(inputDirectory=args.input_directory, inputFile=args.input_file, mmapMode=args.mmap_mode, hdfDataset=args.hdf_dataset, frameRate=args.frame_rate, nFrames=args.n_frames, cacheSize=args.cache_size, nx=args.n_x_pixels, ny=args.n_y_pixels, datatype=args.datatype, minimum=args.minimum, maximum=args.maximum, runtime=args.runtime, channelName=args.channel_name, notifyPv=args.notify_pv, notifyPvValue=args.notify_pv_value, metadataPv=args.metadata_pv, startDelay=args.start_delay, reportPeriod=args.report_period, disableCurses=args.disable_curses) + server = AdSimServer(inputDirectory=args.input_directory, inputFile=args.input_file, mmapMode=args.mmap_mode, hdfDataset=args.hdf_dataset, hdfCompressionMode=args.hdf_compression_mode, frameRate=args.frame_rate, nFrames=args.n_frames, cacheSize=args.cache_size, nx=args.n_x_pixels, ny=args.n_y_pixels, datatype=args.datatype, minimum=args.minimum, maximum=args.maximum, runtime=args.runtime, channelName=args.channel_name, notifyPv=args.notify_pv, notifyPvValue=args.notify_pv_value, metadataPv=args.metadata_pv, startDelay=args.start_delay, reportPeriod=args.report_period, disableCurses=args.disable_curses) server.start() try: diff --git a/pvapy/utility/adImageUtility.py b/pvapy/utility/adImageUtility.py index 2fd3a90..c470ebb 100644 --- a/pvapy/utility/adImageUtility.py +++ b/pvapy/utility/adImageUtility.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python - import time import numpy as np import pvaccess as pva @@ -34,16 +32,16 @@ class AdImageUtility: } PVA_DATA_TYPE_MAP = { - np.dtype('uint8') : [pva.UBYTE], - np.dtype('int8') : [pva.BYTE], - np.dtype('uint16') : [pva.USHORT], - np.dtype('int16') : [pva.SHORT], - np.dtype('uint32') : [pva.UINT], - np.dtype('int32') : [pva.INT], - np.dtype('uint64') : [pva.ULONG], - np.dtype('int64') : [pva.LONG], - np.dtype('float32') : [pva.FLOAT], - np.dtype('float64') : [pva.DOUBLE] + np.dtype('uint8') : pva.UBYTE, + np.dtype('int8') : pva.BYTE, + np.dtype('uint16') : pva.USHORT, + np.dtype('int16') : pva.SHORT, + np.dtype('uint32') : pva.UINT, + np.dtype('int32') : pva.INT, + np.dtype('uint64') : pva.ULONG, + np.dtype('int64') : pva.LONG, + np.dtype('float32') : pva.FLOAT, + np.dtype('float64') : pva.DOUBLE } @classmethod @@ -131,33 +129,42 @@ def getNtNdArrayDataFieldKey(cls, image): return cls.NTNDA_DATA_FIELD_KEY_MAP.get(image.dtype) @classmethod - def generateNtNdArray2D(cls, imageId, image, extraFieldsPvObject=None): + def generateNtNdArray2D(cls, imageId, imageData, nx=None, ny=None, dtype=None, compressorName=None, extraFieldsPvObject=None): if extraFieldsPvObject is None: ntNdArray = pva.NtNdArray() else: ntNdArray = pva.NtNdArray(extraFieldsPvObject.getStructureDict()) - dataFieldKey = cls.NTNDA_DATA_FIELD_KEY_MAP.get(image.dtype) - pvaDataType = cls.PVA_DATA_TYPE_MAP.get(image.dtype) - ny, nx = image.shape - data = image.flatten() + dataFieldKey = cls.NTNDA_DATA_FIELD_KEY_MAP.get(imageData.dtype) + data = imageData.flatten() + if not compressorName: + pvaDataType = cls.PVA_DATA_TYPE_MAP.get(imageData.dtype) + ny, nx = imageData.shape + size = nx*ny*data.itemsize + ntNdArray['compressedSize'] = size + ntNdArray['uncompressedSize'] = size + else: + dtype = np.dtype(dtype) + pvaDataType = cls.PVA_DATA_TYPE_MAP.get(dtype) + codec = pva.PvCodec(compressorName, pva.PvInt(int(pvaDataType))) + ntNdArray['codec'] = codec + size = nx*ny*dtype.itemsize + ntNdArray['uncompressedSize'] = size + ntNdArray['compressedSize'] = len(data) + ntNdArray['uniqueId'] = int(imageId) dims = [pva.PvDimension(nx, 0, nx, 1, False), \ pva.PvDimension(ny, 0, ny, 1, False)] ntNdArray['dimension'] = dims - size = nx*ny*data.itemsize - ntNdArray['compressedSize'] = size - ntNdArray['uncompressedSize'] = size ts = pva.PvTimeStamp(time.time()) ntNdArray['timeStamp'] = ts ntNdArray['dataTimeStamp'] = ts ntNdArray['descriptor'] = 'Image generated by PvaPy' - pvaTypeKey = cls.NTNDA_DATA_FIELD_KEY_MAP.get(image.dtype) # Alternative way of setting data - #u = pva.PvObject({dataFieldKey : pvaDataType}, {dataFieldKey : data}) + #u = pva.PvObject({dataFieldKey : [pvaDataType]}, {dataFieldKey : data}) #ntNdArray.setUnion(u) - ntNdArray['value'] = {pvaTypeKey : data} + ntNdArray['value'] = {dataFieldKey : data} attrs = [pva.NtAttribute('ColorMode', pva.PvInt(0))] ntNdArray['attribute'] = attrs if extraFieldsPvObject is not None: @@ -186,7 +193,7 @@ def replaceNtNdArrayImage2D(cls, ntNdArray, imageId, image, extraFieldsPvObject= ntNdArray['timeStamp'] = ts ntNdArray['dataTimeStamp'] = ts - u = pva.PvObject({dataFieldKey : pvaDataType}, {dataFieldKey : data}) + u = pva.PvObject({dataFieldKey : [pvaDataType]}, {dataFieldKey : data}) ntNdArray.setUnion(u) if extraFieldsPvObject is not None: ntNdArray.set(extraFieldsPvObject) From 93105478b319bbbfd0fa7b693d049d01926465a3 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Wed, 30 Nov 2022 19:41:55 -0600 Subject: [PATCH 10/31] better handling for serving images from files; fix runtime issue for small files --- pvapy/cli/adSimServer.py | 41 ++++++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/pvapy/cli/adSimServer.py b/pvapy/cli/adSimServer.py index b686f80..481ba8a 100755 --- a/pvapy/cli/adSimServer.py +++ b/pvapy/cli/adSimServer.py @@ -381,33 +381,36 @@ def getFrameFromCache(self): def frameProducer(self, extraFieldsPvObject=None): startTime = time.time() frameId = 0 + frameData = None while not self.isDone: for fg in self.frameGeneratorList: nInputFrames, ny, nx, dtype, compressorName = fg.getFrameInfo() for fgFrameId in range(0,nInputFrames): - if self.isDone or (self.nFrames > 0 and frameId >= self.nFrames): + if self.isDone or (self.nInputFrames > 0 and frameId >= self.nInputFrames): break frameData = fg.getFrameData(fgFrameId) + if frameData is None: + break ntnda = AdImageUtility.generateNtNdArray2D(frameId, frameData, nx, ny, dtype, compressorName, extraFieldsPvObject) self.addFrameToCache(frameId, ntnda) frameId += 1 - if not self.usingQueue: - # All frames are in cache + if self.isDone or not self.usingQueue or frameData is None or (self.nInputFrames > 0 and frameId >= self.nInputFrames): + # All frames are in cache or we cannot generate any more data break self.printReport(f'Frame producer is done after {frameId} generated frames') def prepareFrame(self, t=0): # Get cached frame frame = self.getFrameFromCache() - - # Correct image id and timestamps - self.currentFrameId += 1 - frame['uniqueId'] = self.currentFrameId - if t <= 0: - t = time.time() - ts = pva.PvTimeStamp(t) - frame['timeStamp'] = ts - frame['dataTimeStamp'] = ts + if frame is not None: + # Correct image id and timestamps + self.currentFrameId += 1 + frame['uniqueId'] = self.currentFrameId + if t <= 0: + t = time.time() + ts = pva.PvTimeStamp(t) + frame['timeStamp'] = ts + frame['dataTimeStamp'] = ts return frame def framePublisher(self): @@ -425,6 +428,10 @@ def framePublisher(self): # so that metadata and image times are as close as possible try: frame = self.prepareFrame(updateTime) + except pva.QueueEmpty: + self.printReport(f'Server exiting after emptying queue') + self.isDone = True + return except Exception: if self.isDone: return @@ -525,9 +532,15 @@ def main(): server = AdSimServer(inputDirectory=args.input_directory, inputFile=args.input_file, mmapMode=args.mmap_mode, hdfDataset=args.hdf_dataset, hdfCompressionMode=args.hdf_compression_mode, frameRate=args.frame_rate, nFrames=args.n_frames, cacheSize=args.cache_size, nx=args.n_x_pixels, ny=args.n_y_pixels, datatype=args.datatype, minimum=args.minimum, maximum=args.maximum, runtime=args.runtime, channelName=args.channel_name, notifyPv=args.notify_pv, notifyPvValue=args.notify_pv_value, metadataPv=args.metadata_pv, startDelay=args.start_delay, reportPeriod=args.report_period, disableCurses=args.disable_curses) server.start() + expectedRuntime = args.runtime+args.start_delay+server.SHUTDOWN_DELAY + startTime = time.time() try: - waitTime = args.runtime+args.start_delay+server.SHUTDOWN_DELAY - time.sleep(waitTime) + while True: + time.sleep(1) + now = time.time() + runtime = now - startTime + if runtime > expectedRuntime or server.isDone: + break except KeyboardInterrupt as ex: pass server.stop() From d8ac4ea7f66453182f7ff495ee9e2d5e8cf29386 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Fri, 9 Dec 2022 08:30:38 -0600 Subject: [PATCH 11/31] renamed base controller classes --- pvapy/hpc/dataCollectorController.py | 8 +- pvapy/hpc/dataConsumerController.py | 8 +- pvapy/hpc/hpcController.py | 355 +------------------------- pvapy/hpc/mpDataConsumerController.py | 6 +- 4 files changed, 22 insertions(+), 355 deletions(-) diff --git a/pvapy/hpc/dataCollectorController.py b/pvapy/hpc/dataCollectorController.py index 2f9f565..a3c107f 100755 --- a/pvapy/hpc/dataCollectorController.py +++ b/pvapy/hpc/dataCollectorController.py @@ -8,10 +8,10 @@ from .dataProcessingController import DataProcessingController from .sourceChannel import SourceChannel from .dataCollector import DataCollector -from .hpcController import HpcController +from .systemController import SystemController -class DataCollectorController(HpcController): +class DataCollectorController(SystemController): CONTROLLER_TYPE = 'collector' @@ -45,7 +45,7 @@ class DataCollectorController(HpcController): ''' def __init__(self, inputChannel, outputChannel=None, statusChannel=None, controlChannel=None, idFormatSpec=None, processorFile=None, processorClass=None, processorArgs=None, objectIdField='uniqueId', objectIdOffset=0, fieldRequest='', skipInitialUpdates=1, reportStatsList='all', logLevel=None, logFile=None, disableCurses=False, collectorId=1, producerIdList='1,2', serverQueueSize=0, monitorQueueSize=-1, collectorCacheSize=-1, metadataChannels=None): - HpcController.__init__(self, inputChannel, outputChannel=outputChannel, statusChannel=statusChannel, controlChannel=controlChannel, idFormatSpec=idFormatSpec, processorFile=processorFile, processorClass=processorClass, processorArgs=processorArgs, objectIdField=objectIdField, objectIdOffset=objectIdOffset, fieldRequest=fieldRequest, skipInitialUpdates=skipInitialUpdates, reportStatsList=reportStatsList, logLevel=logLevel, logFile=logFile, disableCurses=disableCurses) + SystemController.__init__(self, inputChannel, outputChannel=outputChannel, statusChannel=statusChannel, controlChannel=controlChannel, idFormatSpec=idFormatSpec, processorFile=processorFile, processorClass=processorClass, processorArgs=processorArgs, objectIdField=objectIdField, objectIdOffset=objectIdOffset, fieldRequest=fieldRequest, skipInitialUpdates=skipInitialUpdates, reportStatsList=reportStatsList, logLevel=logLevel, logFile=logFile, disableCurses=disableCurses) self.collectorId = collectorId self.producerIdListSpec = producerIdList @@ -67,7 +67,7 @@ def createDataProcessorConfig(self, collectorId): self.logger.debug(f'Processor output channel name: {self.outputChannel}') # Create config dict - return HpcController.createDataProcessorConfig(self, collectorId) + return SystemController.createDataProcessorConfig(self, collectorId) def getStatusTypeDict(self): statusTypeDict = DataCollector.STATUS_TYPE_DICT diff --git a/pvapy/hpc/dataConsumerController.py b/pvapy/hpc/dataConsumerController.py index d5838bc..0ab382b 100755 --- a/pvapy/hpc/dataConsumerController.py +++ b/pvapy/hpc/dataConsumerController.py @@ -8,9 +8,9 @@ from .dataProcessingController import DataProcessingController from .sourceChannel import SourceChannel from .dataConsumer import DataConsumer -from .hpcController import HpcController +from .systemController import SystemController -class DataConsumerController(HpcController): +class DataConsumerController(SystemController): CONTROLLER_TYPE = 'consumer' @@ -53,7 +53,7 @@ class DataConsumerController(HpcController): ''' def __init__(self, inputChannel, outputChannel=None, statusChannel=None, controlChannel=None, idFormatSpec=None, processorFile=None, processorClass=None, processorArgs=None, objectIdField='uniqueId', objectIdOffset=0, fieldRequest='', skipInitialUpdates=1, reportStatsList='all', logLevel=None, logFile=None, disableCurses=False, consumerId=1, nConsumers=1, consumerIdList=None, inputProviderType='pva', serverQueueSize=0, monitorQueueSize=-1, accumulateObjects=-1, accumulationTimeout=1, distributorPluginName='pydistributor', distributorGroup=None, distributorSet=None, distributorTrigger=None, distributorUpdates=None, nDistributorSets=1, metadataChannels=None): - HpcController.__init__(self, inputChannel, outputChannel=outputChannel, statusChannel=statusChannel, controlChannel=controlChannel, idFormatSpec=idFormatSpec, processorFile=processorFile, processorClass=processorClass, processorArgs=processorArgs, objectIdField=objectIdField, objectIdOffset=objectIdOffset, fieldRequest=fieldRequest, skipInitialUpdates=skipInitialUpdates, reportStatsList=reportStatsList, logLevel=logLevel, logFile=logFile, disableCurses=disableCurses) + SystemController.__init__(self, inputChannel, outputChannel=outputChannel, statusChannel=statusChannel, controlChannel=controlChannel, idFormatSpec=idFormatSpec, processorFile=processorFile, processorClass=processorClass, processorArgs=processorArgs, objectIdField=objectIdField, objectIdOffset=objectIdOffset, fieldRequest=fieldRequest, skipInitialUpdates=skipInitialUpdates, reportStatsList=reportStatsList, logLevel=logLevel, logFile=logFile, disableCurses=disableCurses) self.consumerId = consumerId self.nConsumers = nConsumers if consumerIdList: @@ -103,7 +103,7 @@ def createDataProcessorConfig(self, consumerId): self.logger.debug(f'Processor output channel name: {self.outputChannel}') # Create config dict - return HpcController.createDataProcessorConfig(self, consumerId) + return SystemController.createDataProcessorConfig(self, consumerId) def getStatusTypeDict(self): statusTypeDict = DataConsumer.STATUS_TYPE_DICT diff --git a/pvapy/hpc/hpcController.py b/pvapy/hpc/hpcController.py index bea703b..2855d81 100755 --- a/pvapy/hpc/hpcController.py +++ b/pvapy/hpc/hpcController.py @@ -1,14 +1,7 @@ #!/usr/bin/env python -import json import threading -import time -import pvaccess as pva from ..utility.loggingManager import LoggingManager -from ..utility.objectUtility import ObjectUtility -from ..utility.pvapyPrettyPrinter import PvaPyPrettyPrinter -from ..hpc.sourceChannel import SourceChannel -from ..hpc.dataProcessingController import DataProcessingController class HpcController: @@ -16,359 +9,33 @@ class HpcController: MIN_STATUS_UPDATE_PERIOD = 10.0 COMMAND_EXEC_DELAY = 0.1 + SUCCESS_RETURN_CODE = 0 + ERROR_RETURN_CODE = 1 + GET_STATS_COMMAND = 'get_stats' RESET_STATS_COMMAND = 'reset_stats' CONFIGURE_COMMAND = 'configure' STOP_COMMAND = 'stop' - CONTROLLER_TYPE = 'controller' - - CONTROL_TYPE_DICT = { - 'objectTime' : pva.DOUBLE, - 'objectTimestamp' : pva.PvTimeStamp(), - 'command' : pva.STRING, - 'args' : pva.STRING, - 'statusMessage' : pva.STRING - } - - @classmethod - def getControllerIdField(cls): - return f'{cls.CONTROLLER_TYPE}Id' - - @classmethod - def getControlTypeDict(cls): - d = {cls.getControllerIdField() : pva.UINT} - d.update(cls.CONTROL_TYPE_DICT) - return d + CONTROLLER_TYPE = 'hpc' - @classmethod - def generateIdList(cls, listSpec): - # List spec should be given either as range() spec - # or as comma-separated list. - idList = listSpec - if type(listSpec) == str: - if not listSpec.startswith('range') and not listSpec.startswith('['): - idList = f'[{listSpec}]' - idList = eval(idList) - return list(idList) - - def __init__(self, inputChannel, outputChannel=None, statusChannel=None, controlChannel=None, processorFile=None, processorClass=None, processorArgs=None, idFormatSpec=None, objectIdField='uniqueId', objectIdOffset=0, fieldRequest='', skipInitialUpdates=1, reportStatsList='all', logLevel=None, logFile=None, disableCurses=False): - self.lock = threading.Lock() - self.screen = None - self.inputChannel = inputChannel - self.outputChannel = outputChannel - self.statusChannel = statusChannel - self.controlChannel = controlChannel - self.idFormatSpec = idFormatSpec - self.processorFile = processorFile - self.processorClass = processorClass - self.processorArgs = processorArgs - self.objectIdField = objectIdField - self.objectIdOffset = objectIdOffset - self.fieldRequest = fieldRequest - self.skipInitialUpdates = skipInitialUpdates - self.reportStatsList = reportStatsList + def __init__(self, logLevel=None, logFile=None): self.logLevel = logLevel self.logFile = logFile - self.disableCurses = disableCurses - - self.isStopped = True - self.shouldBeStopped = False - self.isRunning = False - self.statsObjectId = 0 - self.statsEnabled = {} - for statsType in ['monitor','queue','processor','user']: - self.statsEnabled[f'{statsType}Stats'] = 'all' in reportStatsList or statsType in reportStatsList - self.prettyPrinter = PvaPyPrettyPrinter() - self.hpcObject = None - self.hpcObjectId = None - - self.logger = self.setupLogging(logLevel, logFile) - self.screen = self.setupCurses(self.disableCurses, self.logLevel) - - def setupLogging(self, logLevel, logFile): - if logLevel: - LoggingManager.setLogLevel(logLevel) - if logFile: - LoggingManager.addFileHandler(logFile) - else: - LoggingManager.addStreamHandler() - logger = LoggingManager.getLogger(self.__class__.__name__) - return logger + self.logger = LoggingManager.getLogger(self.__class__.__name__, logLevel, logFile) - def setupCurses(self, disableCurses, logLevel): - screen = None - if not disableCurses and not logLevel: - try: - import curses - screen = curses.initscr() - self.curses = curses - except ImportError as ex: - self.logger.warning(f'Disabling curses library: {ex}') - return screen - - def formatIdString(self, idValue): - if self.idFormatSpec: - return f'{idValue:{self.idFormatSpec}}' - return f'{idValue}' - - def controlCallback(self, pv): - t = time.time() - if 'command' not in pv: - statusMessage = f'Ignoring invalid request (no command specified): {pv}' - self.logger.warning(statusMessage) - self.controlPvObject.set({'statusMessage' : statusMessage, 'objectTime' : t, 'objectTimestamp' : pva.PvTimeStamp(t)}) - return - command = pv['command'] - self.logger.debug(f'Got command: {command}') - if command == self.RESET_STATS_COMMAND: - self.logger.info(f'Control channel: resetting {self.CONTROLLER_TYPE} statistics') - cTimer = threading.Timer(self.COMMAND_EXEC_DELAY, self.controlResetStats) - elif command == self.GET_STATS_COMMAND: - self.logger.info(f'Control channel: getting {self.CONTROLLER_TYPE} statistics') - cTimer = threading.Timer(self.COMMAND_EXEC_DELAY, self.controlGetStats) - elif command == self.CONFIGURE_COMMAND: - args = '' - if 'args' not in pv: - self.logger.debug('Empty keyword arguments string for the configure request') - else: - args = pv['args'] - self.logger.info(f'Control channel: configuring {self.CONTROLLER_TYPE} with args: {args}') - cTimer = threading.Timer(self.COMMAND_EXEC_DELAY, self.controlConfigure, args=[args]) - elif command == self.STOP_COMMAND: - self.logger.info(f'Control channel: stopping {self.CONTROLLER_TYPE}') - cTimer = threading.Timer(self.COMMAND_EXEC_DELAY, self.controlStop) - else: - statusMessage = f'Ignoring invalid request (unrecognized command specified): {pv}' - self.logger.warning(statusMessage) - self.controlPvObject.set({'statusMessage' : statusMessage, 'objectTime' : t, 'objectTimestamp' : pva.PvTimeStamp(t)}) - return - statusMessage = 'Command successful' - self.controlPvObject.set({'statusMessage' : statusMessage, 'objectTime' : t, 'objectTimestamp' : pva.PvTimeStamp(t)}) - cTimer.start() - - def controlConfigure(self, configDict): - self.logger.debug(f'Configuring {self.CONTROLLER_TYPE} {self.hpcObjectId} with: {configDict}') - try: - configDict = json.loads(configDict) - self.logger.debug(f'Converted configuration args string from JSON: {configDict}') - except Exception as ex: - self.logger.debug(f'Cannot convert string {configDict} from JSON: {ex}') - try: - self.hpcObject.configure(configDict) - statusMessage = 'Configuration successful' - self.logger.debug(statusMessage) - except Exception as ex: - self.stopScreen() - statusMessage = f'Configuration failed: {ex}' - self.logger.warning(statusMessage) - self.controlPvObject['statusMessage'] = statusMessage - - def controlResetStats(self): - self.logger.debug(f'Resetting stats for {self.CONTROLLER_TYPE} {self.hpcObjectId}') - self.hpcObject.resetStats() - statusMessage = 'Stats reset successful' - self.controlPvObject['statusMessage'] = statusMessage - - def controlGetStats(self): - self.logger.debug(f'Getting stats for {self.CONTROLLER_TYPE} {self.hpcObjectId}') - self.reportStats() - statusMessage = 'Stats update successful' - self.controlPvObject['statusMessage'] = statusMessage - - def controlStop(self): - self.logger.debug(f'Stopping {self.CONTROLLER_TYPE} {self.hpcObjectId}') - self.shouldBeStopped = True - statusMessage = 'Stop flag set' - self.controlPvObject['statusMessage'] = statusMessage - - def getStatusTypeDict(self): - return {} - - def createOutputChannels(self, hpcObjectId): - self.pvaServer = None - hpcObjectIdString = self.formatIdString(hpcObjectId) - if self.statusChannel or self.controlChannel or self.outputChannel: - self.pvaServer = pva.PvaServer() - if self.statusChannel == '_': - self.statusChannel = f'pvapy:{self.CONTROLLER_TYPE}:{hpcObjectIdString}:status' - if self.statusChannel: - self.statusChannel = self.statusChannel.replace('*', hpcObjectIdString) - self.logger.debug(f'Status channel name: {self.statusChannel}') - self.statusTypeDict = self.getStatusTypeDict() - if self.statusChannel: - statusPvObject = pva.PvObject(self.statusTypeDict, {f'{self.getControllerIdField()}' : hpcObjectId}) - self.pvaServer.addRecord(self.statusChannel, statusPvObject, None) - self.logger.debug(f'Created {self.CONTROLLER_TYPE} status channel: {self.statusChannel}') - - if self.controlChannel == '_': - self.controlChannel = f'pvapy:{self.CONTROLLER_TYPE}:{hpcObjectIdString}:control' - if self.controlChannel: - self.controlChannel = self.controlChannel.replace('*', hpcObjectIdString) - self.logger.debug(f'Control channel name: {self.controlChannel}') - if self.controlChannel: - # Keep reference to the control object so we can - # update it - self.controlPvObject = pva.PvObject(self.getControlTypeDict(), {f'{self.getControllerIdField()}' : hpcObjectId}) - self.pvaServer.addRecord(self.controlChannel, self.controlPvObject, self.controlCallback) - self.logger.debug(f'Created {self.CONTROLLER_TYPE} control channel: {self.controlChannel}') - - def createDataProcessorConfig(self, processorId): - processorConfig = {} - if self.processorArgs: - processorConfig = json.loads(self.processorArgs) - processorConfig['processorId'] = processorId - processorConfig['inputChannel'] = self.inputChannel - processorConfig['outputChannel'] = self.outputChannel - processorConfig['objectIdField'] = self.objectIdField - processorConfig['skipInitialUpdates'] = self.skipInitialUpdates - processorConfig['objectIdOffset'] = self.objectIdOffset - processorConfig['fieldRequest'] = self.fieldRequest - return processorConfig - - def createDataProcessor(self, processorId): - self.processorConfig = self.createDataProcessorConfig(processorId) - self.logger.debug(f'Using processor configuration: {self.processorConfig}') - userDataProcessor = None - if self.processorFile and self.processorClass: - userDataProcessor = ObjectUtility.createObjectInstanceFromFile(self.processorFile, 'userDataProcessorModule', self.processorClass, self.processorConfig) - elif self.processorClass: - userDataProcessor = ObjectUtility.createObjectInstanceFromClassPath(self.processorClass, self.processorConfig) - - if userDataProcessor is not None: - self.logger.debug(f'Created data processor {processorId}: {userDataProcessor}') - userDataProcessor.processorId = processorId - userDataProcessor.objectIdField = self.processorConfig['objectIdField'] - self.processingController = DataProcessingController(self.processorConfig, userDataProcessor) - return self.processingController - - def getStatusTypeDict(self, processingController): - return {} + def configure(self, configDict): + pass def start(self): - self.lock.acquire() - try: - if not self.isStopped: - self.logger.warn(f'Controller for hpc {self.CONTROLLER_TYPE} {self.hpcObjectId} is already started') - return - self.isStopped = False - self.shouldBeStopped = False - self.logger.debug(f'Controller for hpc {self.CONTROLLER_TYPE} {self.hpcObjectId} is starting') - - try: - self.logger.info(f'Starting hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}') - self.hpcObject.start() - self.logger.info(f'Started hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}') - except Exception as ex: - self.logger.warn(f'Could not start hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}: {ex}') - raise - - if self.pvaServer: - self.pvaServer.start() - finally: - self.lock.release() + pass def reportStats(self, statsDict=None): - if not statsDict: - statsDict = self.getStats() - statsDict[f'{self.getControllerIdField()}'] = self.hpcObjectId - report = self.prettyPrinter.pformat(statsDict) - - if self.screen: - try: - self.screen.erase() - self.screen.addstr(report) - self.screen.refresh() - return - except Exception as ex: - # Turn screen off on errors - self.stopScreen() - print(report) + pass def getStats(self): return {} - def processPvUpdate(self, updateWaitTime): - return False - - def stopScreen(self): - if self.screen: - self.curses.endwin() - self.screen = None - def stop(self): - self.lock.acquire() - try: - if self.isStopped: - self.logger.warn(f'Controller for hpc {self.CONTROLLER_TYPE} {self.hpcObjectId} is already stopped') - return - if self.isRunning: - # Stop running thread - self.shouldBeStopped = True - self.isStopped = True - self.logger.debug(f'Controller for hpc {self.CONTROLLER_TYPE} {self.hpcObjectId} is stopping') - try: - self.logger.info(f'Stopping hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}') - self.hpcObject.stop() - except Exception as ex: - self.logger.warn(f'Could not stop hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}') - statsDict = self.hpcObject.getStats() - self.stopScreen() - return statsDict - finally: - self.lock.release() - - def run(self, runtime=0, reportPeriod=0): - self.lock.acquire() - try: - if self.isRunning: - self.logger.warn(f'Controller for {self.CONTROLLER_TYPE} {self.hpcObjectId} is already running') - return - self.isRunning = True - self.shouldBeStopped = False - finally: - self.lock.release() - self.start() - startTime = time.time() - lastReportTime = startTime - lastStatusUpdateTime = startTime - waitTime = self.WAIT_TIME - minStatusUpdatePeriod = self.MIN_STATUS_UPDATE_PERIOD - while True: - try: - now = time.time() - wakeTime = now+waitTime - if self.shouldBeStopped: - break - if runtime > 0: - rt = now - startTime - if rt > runtime: - break - if reportPeriod > 0 and now-lastReportTime > reportPeriod: - lastReportTime = now - lastStatusUpdateTime = now - self.reportStats() - - if now-lastStatusUpdateTime > minStatusUpdatePeriod: - lastStatusUpdateTime = now - self.getStats() - - try: - hasProcessedObject = self.processPvUpdate(waitTime) - if not hasProcessedObject: - # Check if we need to sleep - delay = wakeTime-time.time() - if delay > 0: - time.sleep(delay) - except Exception as ex: - self.stopScreen() - self.logger.error(f'Processing error: {ex}') - - except KeyboardInterrupt as ex: - break + pass - statsDict = self.stop() - # Allow clients monitoring various channels to get last update - time.sleep(waitTime) - self.reportStats(statsDict) - self.isRunning = False diff --git a/pvapy/hpc/mpDataConsumerController.py b/pvapy/hpc/mpDataConsumerController.py index 870ed94..f44bc88 100755 --- a/pvapy/hpc/mpDataConsumerController.py +++ b/pvapy/hpc/mpDataConsumerController.py @@ -7,10 +7,10 @@ import multiprocessing as mp from ..utility.loggingManager import LoggingManager from .dataConsumer import DataConsumer -from .hpcController import HpcController +from .systemController import SystemController from .dataConsumerController import DataConsumerController -class MpDataConsumerController(HpcController): +class MpDataConsumerController(SystemController): ''' Controller class for a multiple data consumers. @@ -51,7 +51,7 @@ class MpDataConsumerController(HpcController): ''' def __init__(self, inputChannel, outputChannel=None, statusChannel=None, controlChannel=None, idFormatSpec=None, processorFile=None, processorClass=None, processorArgs=None, objectIdField='uniqueId', objectIdOffset=0, fieldRequest='', skipInitialUpdates=1, reportStatsList='all', logLevel=None, logFile=None, disableCurses=False, consumerId=1, nConsumers=1, consumerIdList=None, inputProviderType='pva', serverQueueSize=0, monitorQueueSize=-1, accumulateObjects=-1, accumulationTimeout=1, distributorPluginName='pydistributor', distributorGroup=None, distributorSet=None, distributorTrigger=None, distributorUpdates=None, nDistributorSets=1, metadataChannels=None): - HpcController.__init__(self, inputChannel, outputChannel=outputChannel, statusChannel=statusChannel, controlChannel=controlChannel, idFormatSpec=idFormatSpec, processorFile=processorFile, processorClass=processorClass, processorArgs=processorArgs, objectIdField=objectIdField, objectIdOffset=objectIdOffset, fieldRequest=fieldRequest, skipInitialUpdates=skipInitialUpdates, reportStatsList=reportStatsList, logLevel=logLevel, logFile=logFile, disableCurses=disableCurses) + SystemController.__init__(self, inputChannel, outputChannel=outputChannel, statusChannel=statusChannel, controlChannel=controlChannel, idFormatSpec=idFormatSpec, processorFile=processorFile, processorClass=processorClass, processorArgs=processorArgs, objectIdField=objectIdField, objectIdOffset=objectIdOffset, fieldRequest=fieldRequest, skipInitialUpdates=skipInitialUpdates, reportStatsList=reportStatsList, logLevel=logLevel, logFile=logFile, disableCurses=disableCurses) self.consumerId = consumerId # used as a start of the consumer id range self.nConsumers = nConsumers self.consumerIdListSpec = consumerIdList From 7cf9defd6a46351317425d74c75dfdc37120f105 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Fri, 9 Dec 2022 08:33:03 -0600 Subject: [PATCH 12/31] renamed hpc controller --- pvapy/hpc/systemController.py | 354 ++++++++++++++++++++++++++++++++++ 1 file changed, 354 insertions(+) create mode 100755 pvapy/hpc/systemController.py diff --git a/pvapy/hpc/systemController.py b/pvapy/hpc/systemController.py new file mode 100755 index 0000000..e54b658 --- /dev/null +++ b/pvapy/hpc/systemController.py @@ -0,0 +1,354 @@ +#!/usr/bin/env python + +import json +import threading +import time +import pvaccess as pva +from ..utility.loggingManager import LoggingManager +from ..utility.objectUtility import ObjectUtility +from ..utility.pvapyPrettyPrinter import PvaPyPrettyPrinter +from .sourceChannel import SourceChannel +from .dataProcessingController import DataProcessingController +from .hpcController import HpcController + +class SystemController(HpcController): + + CONTROLLER_TYPE = 'system' + + CONTROL_TYPE_DICT = { + 'objectTime' : pva.DOUBLE, + 'objectTimestamp' : pva.PvTimeStamp(), + 'command' : pva.STRING, + 'args' : pva.STRING, + 'statusMessage' : pva.STRING + } + + @classmethod + def getControllerIdField(cls): + return f'{cls.CONTROLLER_TYPE}Id' + + @classmethod + def getControlTypeDict(cls): + d = {cls.getControllerIdField() : pva.UINT} + d.update(cls.CONTROL_TYPE_DICT) + return d + + @classmethod + def generateIdList(cls, listSpec): + # List spec should be given either as range() spec + # or as comma-separated list. + idList = listSpec + if type(listSpec) == str: + if not listSpec.startswith('range') and not listSpec.startswith('['): + idList = f'[{listSpec}]' + idList = eval(idList) + return list(idList) + + def __init__(self, inputChannel, outputChannel=None, statusChannel=None, controlChannel=None, processorFile=None, processorClass=None, processorArgs=None, idFormatSpec=None, objectIdField='uniqueId', objectIdOffset=0, fieldRequest='', skipInitialUpdates=1, reportStatsList='all', logLevel=None, logFile=None, disableCurses=False): + HpcController.__init__(self, logLevel=logLevel, logFile=logFile) + self.lock = threading.Lock() + self.screen = None + self.inputChannel = inputChannel + self.outputChannel = outputChannel + self.statusChannel = statusChannel + self.controlChannel = controlChannel + self.idFormatSpec = idFormatSpec + self.processorFile = processorFile + self.processorClass = processorClass + self.processorArgs = processorArgs + self.objectIdField = objectIdField + self.objectIdOffset = objectIdOffset + self.fieldRequest = fieldRequest + self.skipInitialUpdates = skipInitialUpdates + self.reportStatsList = reportStatsList + self.disableCurses = disableCurses + + self.isStopped = True + self.shouldBeStopped = False + self.isRunning = False + self.statsObjectId = 0 + self.statsEnabled = {} + for statsType in ['monitor','queue','processor','user']: + self.statsEnabled[f'{statsType}Stats'] = 'all' in reportStatsList or statsType in reportStatsList + self.prettyPrinter = PvaPyPrettyPrinter() + self.hpcObject = None + self.hpcObjectId = None + + self.screen = self.setupCurses(self.disableCurses, self.logLevel) + + def setupCurses(self, disableCurses, logLevel): + screen = None + if not disableCurses and not logLevel: + try: + import curses + screen = curses.initscr() + self.curses = curses + except ImportError as ex: + self.logger.warning(f'Disabling curses library: {ex}') + return screen + + def formatIdString(self, idValue): + if self.idFormatSpec: + return f'{idValue:{self.idFormatSpec}}' + return f'{idValue}' + + def controlCallback(self, pv): + t = time.time() + if 'command' not in pv: + statusMessage = f'Ignoring invalid request (no command specified): {pv}' + self.logger.warning(statusMessage) + self.controlPvObject.set({'statusMessage' : statusMessage, 'objectTime' : t, 'objectTimestamp' : pva.PvTimeStamp(t)}) + return + command = pv['command'] + self.logger.debug(f'Got command: {command}') + if command == self.RESET_STATS_COMMAND: + self.logger.info(f'Control channel: resetting {self.CONTROLLER_TYPE} statistics') + cTimer = threading.Timer(self.COMMAND_EXEC_DELAY, self.controlResetStats) + elif command == self.GET_STATS_COMMAND: + self.logger.info(f'Control channel: getting {self.CONTROLLER_TYPE} statistics') + cTimer = threading.Timer(self.COMMAND_EXEC_DELAY, self.controlGetStats) + elif command == self.CONFIGURE_COMMAND: + args = '' + if 'args' not in pv: + self.logger.debug('Empty keyword arguments string for the configure request') + else: + args = pv['args'] + self.logger.info(f'Control channel: configuring {self.CONTROLLER_TYPE} with args: {args}') + cTimer = threading.Timer(self.COMMAND_EXEC_DELAY, self.controlConfigure, args=[args]) + elif command == self.STOP_COMMAND: + self.logger.info(f'Control channel: stopping {self.CONTROLLER_TYPE}') + cTimer = threading.Timer(self.COMMAND_EXEC_DELAY, self.controlStop) + else: + statusMessage = f'Ignoring invalid request (unrecognized command specified): {pv}' + self.logger.warning(statusMessage) + self.controlPvObject.set({'statusMessage' : statusMessage, 'objectTime' : t, 'objectTimestamp' : pva.PvTimeStamp(t)}) + return + statusMessage = 'Command successful' + self.controlPvObject.set({'statusMessage' : statusMessage, 'objectTime' : t, 'objectTimestamp' : pva.PvTimeStamp(t)}) + cTimer.start() + + def controlConfigure(self, configDict): + self.logger.debug(f'Configuring {self.CONTROLLER_TYPE} {self.hpcObjectId} with: {configDict}') + try: + configDict = json.loads(configDict) + self.logger.debug(f'Converted configuration args string from JSON: {configDict}') + except Exception as ex: + self.logger.debug(f'Cannot convert string {configDict} from JSON: {ex}') + try: + self.hpcObject.configure(configDict) + statusMessage = 'Configuration successful' + self.logger.debug(statusMessage) + except Exception as ex: + self.stopScreen() + statusMessage = f'Configuration failed: {ex}' + self.logger.warning(statusMessage) + self.controlPvObject['statusMessage'] = statusMessage + + def controlResetStats(self): + self.logger.debug(f'Resetting stats for {self.CONTROLLER_TYPE} {self.hpcObjectId}') + self.hpcObject.resetStats() + statusMessage = 'Stats reset successful' + self.controlPvObject['statusMessage'] = statusMessage + + def controlGetStats(self): + self.logger.debug(f'Getting stats for {self.CONTROLLER_TYPE} {self.hpcObjectId}') + self.reportStats() + statusMessage = 'Stats update successful' + self.controlPvObject['statusMessage'] = statusMessage + + def controlStop(self): + self.logger.debug(f'Stopping {self.CONTROLLER_TYPE} {self.hpcObjectId}') + self.shouldBeStopped = True + statusMessage = 'Stop flag set' + self.controlPvObject['statusMessage'] = statusMessage + + def getStatusTypeDict(self): + return {} + + def createOutputChannels(self, hpcObjectId): + self.pvaServer = None + hpcObjectIdString = self.formatIdString(hpcObjectId) + if self.statusChannel or self.controlChannel or self.outputChannel: + self.pvaServer = pva.PvaServer() + if self.statusChannel == '_': + self.statusChannel = f'pvapy:{self.CONTROLLER_TYPE}:{hpcObjectIdString}:status' + if self.statusChannel: + self.statusChannel = self.statusChannel.replace('*', hpcObjectIdString) + self.logger.debug(f'Status channel name: {self.statusChannel}') + self.statusTypeDict = self.getStatusTypeDict() + if self.statusChannel: + statusPvObject = pva.PvObject(self.statusTypeDict, {f'{self.getControllerIdField()}' : hpcObjectId}) + self.pvaServer.addRecord(self.statusChannel, statusPvObject, None) + self.logger.debug(f'Created {self.CONTROLLER_TYPE} status channel: {self.statusChannel}') + + if self.controlChannel == '_': + self.controlChannel = f'pvapy:{self.CONTROLLER_TYPE}:{hpcObjectIdString}:control' + if self.controlChannel: + self.controlChannel = self.controlChannel.replace('*', hpcObjectIdString) + self.logger.debug(f'Control channel name: {self.controlChannel}') + if self.controlChannel: + # Keep reference to the control object so we can + # update it + self.controlPvObject = pva.PvObject(self.getControlTypeDict(), {f'{self.getControllerIdField()}' : hpcObjectId}) + self.pvaServer.addRecord(self.controlChannel, self.controlPvObject, self.controlCallback) + self.logger.debug(f'Created {self.CONTROLLER_TYPE} control channel: {self.controlChannel}') + + def createDataProcessorConfig(self, processorId): + processorConfig = {} + if self.processorArgs: + processorConfig = json.loads(self.processorArgs) + processorConfig['processorId'] = processorId + processorConfig['inputChannel'] = self.inputChannel + processorConfig['outputChannel'] = self.outputChannel + processorConfig['objectIdField'] = self.objectIdField + processorConfig['skipInitialUpdates'] = self.skipInitialUpdates + processorConfig['objectIdOffset'] = self.objectIdOffset + processorConfig['fieldRequest'] = self.fieldRequest + return processorConfig + + def createDataProcessor(self, processorId): + self.processorConfig = self.createDataProcessorConfig(processorId) + self.logger.debug(f'Using processor configuration: {self.processorConfig}') + userDataProcessor = None + if self.processorFile and self.processorClass: + userDataProcessor = ObjectUtility.createObjectInstanceFromFile(self.processorFile, 'userDataProcessorModule', self.processorClass, self.processorConfig) + elif self.processorClass: + userDataProcessor = ObjectUtility.createObjectInstanceFromClassPath(self.processorClass, self.processorConfig) + + if userDataProcessor is not None: + self.logger.debug(f'Created data processor {processorId}: {userDataProcessor}') + userDataProcessor.processorId = processorId + userDataProcessor.objectIdField = self.processorConfig['objectIdField'] + self.processingController = DataProcessingController(self.processorConfig, userDataProcessor) + return self.processingController + + def getStatusTypeDict(self, processingController): + return {} + + def start(self): + self.lock.acquire() + try: + if not self.isStopped: + self.logger.warn(f'Controller for hpc {self.CONTROLLER_TYPE} {self.hpcObjectId} is already started') + return + self.isStopped = False + self.shouldBeStopped = False + self.logger.debug(f'Controller for hpc {self.CONTROLLER_TYPE} {self.hpcObjectId} is starting') + + try: + self.logger.info(f'Starting hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}') + self.hpcObject.start() + self.logger.info(f'Started hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}') + except Exception as ex: + self.logger.warn(f'Could not start hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}: {ex}') + raise + + if self.pvaServer: + self.pvaServer.start() + finally: + self.lock.release() + + def reportStats(self, statsDict=None): + if not statsDict: + statsDict = self.getStats() + statsDict[f'{self.getControllerIdField()}'] = self.hpcObjectId + report = self.prettyPrinter.pformat(statsDict) + + if self.screen: + try: + self.screen.erase() + self.screen.addstr(report) + self.screen.refresh() + return + except Exception as ex: + # Turn screen off on errors + self.stopScreen() + print(report) + + def getStats(self): + return {} + + def processPvUpdate(self, updateWaitTime): + return False + + def stopScreen(self): + if self.screen: + self.curses.endwin() + self.screen = None + + def stop(self): + self.lock.acquire() + try: + if self.isStopped: + self.logger.warn(f'Controller for hpc {self.CONTROLLER_TYPE} {self.hpcObjectId} is already stopped') + return + if self.isRunning: + # Stop running thread + self.shouldBeStopped = True + self.isStopped = True + self.logger.debug(f'Controller for hpc {self.CONTROLLER_TYPE} {self.hpcObjectId} is stopping') + try: + self.logger.info(f'Stopping hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}') + self.hpcObject.stop() + except Exception as ex: + self.logger.warn(f'Could not stop hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}') + statsDict = self.hpcObject.getStats() + self.stopScreen() + return statsDict + finally: + self.lock.release() + + def run(self, runtime=0, reportPeriod=0): + self.lock.acquire() + try: + if self.isRunning: + self.logger.warn(f'Controller for {self.CONTROLLER_TYPE} {self.hpcObjectId} is already running') + return + self.isRunning = True + self.shouldBeStopped = False + finally: + self.lock.release() + self.start() + startTime = time.time() + lastReportTime = startTime + lastStatusUpdateTime = startTime + waitTime = self.WAIT_TIME + minStatusUpdatePeriod = self.MIN_STATUS_UPDATE_PERIOD + while True: + try: + now = time.time() + wakeTime = now+waitTime + if self.shouldBeStopped: + break + if runtime > 0: + rt = now - startTime + if rt > runtime: + break + if reportPeriod > 0 and now-lastReportTime > reportPeriod: + lastReportTime = now + lastStatusUpdateTime = now + self.reportStats() + + if now-lastStatusUpdateTime > minStatusUpdatePeriod: + lastStatusUpdateTime = now + self.getStats() + + try: + hasProcessedObject = self.processPvUpdate(waitTime) + if not hasProcessedObject: + # Check if we need to sleep + delay = wakeTime-time.time() + if delay > 0: + time.sleep(delay) + except Exception as ex: + self.stopScreen() + self.logger.error(f'Processing error: {ex}') + + except KeyboardInterrupt as ex: + break + + statsDict = self.stop() + # Allow clients monitoring various channels to get last update + time.sleep(waitTime) + self.reportStats(statsDict) + self.isRunning = False From ed6029bf58fc5f3636e7484694cae89496fa5252 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Fri, 9 Dec 2022 08:34:59 -0600 Subject: [PATCH 13/31] added ability to spawn user work process --- pvapy/hpc/userWorkProcessController.py | 119 +++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100755 pvapy/hpc/userWorkProcessController.py diff --git a/pvapy/hpc/userWorkProcessController.py b/pvapy/hpc/userWorkProcessController.py new file mode 100755 index 0000000..a265a18 --- /dev/null +++ b/pvapy/hpc/userWorkProcessController.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python + +import os +import queue +import multiprocessing as mp +from ..utility.loggingManager import LoggingManager +from .userWorkProcess import UserWorkProcess +from .hpcController import HpcController + +class UserWorkProcessController(HpcController): + + CONTROLLER_TYPE = 'user' + + ''' + Controller class for user work process. + + **UserWorkProcessController(workerId, userDataProcessor, inputDataQueue, outputDataQueue, logLevel=None, logFile=None)** + + :Parameter: *workerId* (str) - Worker id. + :Parameter: *userDataProceessor* (UserDataProcessor) - Instance of the UserDataProcessor class that will be processing data. + :Parameter: *inputDataQueue* (multiprocessing.Queue) - Input data queue. + :Parameter: *outputDataQueue* (multiprocessing.Queue) - Output data queue. + :Parameter: *logLevel* (str) - Log level; possible values: debug, info, warning, error, critical. If not provided, there will be no log output. + :Parameter: *logFile* (str) - Log file. + ''' + def __init__(self, workerId, userDataProcessor, inputDataQueue, outputDataQueue, logLevel=None, logFile=None): + HpcController.__init__(self, logLevel, logFile) + self.workerId = workerId + self.inputDataQueue = inputDataQueue + self.outputDataQueue = outputDataQueue + self.commandRequestQueue = mp.Queue() + self.commandResponseQueue = mp.Queue() + self.requestId = 0 + self.uwProcess = UserWorkProcess(workerId, userDataProcessor, self.commandRequestQueue, self.commandResponseQueue, inputDataQueue, outputDataQueue, logLevel=None, logFile=None) + self.pid = os.getpid() + self.statsDict = {} + self.isStopped = True + + class ProcessNotResponding(Exception): + def __init__(self, args): + Exception.__init__(self, args) + + def _invokeCommandRequest(self, command, args={}): + returnCode = None + try: + requestId = self.requestId + self.requestId += 1 + request = {'requestId' : requestId, 'command' : command} + if args is not None and type(args) == dict: + request.update(args) + self.commandRequestQueue.put(request, block=True, timeout=self.WAIT_TIME) + response = self.commandResponseQueue.get(block=True, timeout=self.WAIT_TIME) + returnCode = response.get('returnCode', self.ERROR_RETURN_CODE) + returnValue = response.get('returnValue') + requestId2 = response.get('requestId') + except queue.Empty: + pass + except Exception as ex: + self.logger.error(f'Error invoking command request {request} for worker {self.workerId}: {ex}') + if returnCode is None: + raise self.ProcessNotResponding(f'No response from worker {self.workerId}') + if returnCode != self.SUCCESS_RETURN_CODE: + error = response.get('error', '') + raise Exception(f'Error response from worker {self.workerId}: {error}') + elif requestId2 != requestId: + raise Exception(f'Invalid response from worker {self.workerId}: request id {requestId} != {requestId2}') + return returnValue + + def start(self): + # Replace interrupt handler for worker processes + # so we can exit cleanly + import signal + originalSigintHandler = signal.signal(signal.SIGINT, signal.SIG_IGN) + self.logger.debug(f'Starting worker {self.workerId}') + self.uwProcess.start() + self.logger.debug(f'Started user work process: {self.uwProcess}') + signal.signal(signal.SIGINT, originalSigintHandler) + self.isStopped = False + + def getStats(self): + statsDict = self.statsDict + try: + if not self.isStopped: + statsDict = self._invokeCommandRequest(self.GET_STATS_COMMAND) + self.statsDict = statsDict + except Exception as ex: + self.logger.warn(f'Cannot get stats for worker {self.workerId}: {ex}') + return statsDict + + def resetStats(self): + try: + self._invokeCommandRequest(self.RESET_STATS_COMMAND) + except Exception as ex: + self.logger.error(f'Cannot reset stats for worker {self.workerId}: {ex}') + + def configure(configDict): + try: + self._invokeCommandRequest(self.CONFIGURE_COMMAND, {'configDict' : configDict}) + except Exception as ex: + self.logger.error(f'Cannot configure worker {self.workerId}: {ex}') + + def stop(self): + self.logger.debug(f'Stopping user work process: {self.uwProcess}') + statsDict = self.statsDict + try: + statsDict = self._invokeCommandRequest(self.STOP_COMMAND) + except self.ProcessNotResponding as ex: + pass + except Exception as ex: + self.logger.warn(f'Cannot stop worker {self.workerId}: {ex}') + try: + self.logger.debug(f'Waiting on child process pid {self.uwProcess.pid} (my pid: {self.pid})') + self.uwProcess.join(self.WAIT_TIME) + except: + pass + self.isStopped = True + self.logger.debug(f'User work process {self.workerId} is done') + return statsDict + From 670cf970e5925a140d016bcd7982883f98c06854 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Fri, 9 Dec 2022 08:36:45 -0600 Subject: [PATCH 14/31] allow setting log level and log file while getting logger --- pvapy/utility/loggingManager.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pvapy/utility/loggingManager.py b/pvapy/utility/loggingManager.py index 4396e06..201d12c 100644 --- a/pvapy/utility/loggingManager.py +++ b/pvapy/utility/loggingManager.py @@ -34,7 +34,13 @@ def addHandler(cls, name, handler): logger.addHandler(handler) @classmethod - def getLogger(cls, name): + def getLogger(cls, name, logLevel=None, logFile=None): + if logLevel: + cls.setLogLevel(logLevel) + if logFile: + cls.addFileHandler(logFile) + else: + cls.addStreamHandler() logger = cls.loggerMap.get(name) if logger: return logger From 32eba38268f3ee8857b429aba517f1c86ef29a56 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Fri, 9 Dec 2022 08:37:09 -0600 Subject: [PATCH 15/31] added ability to spawn user work process --- pvapy/hpc/userWorkProcess.py | 128 +++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100755 pvapy/hpc/userWorkProcess.py diff --git a/pvapy/hpc/userWorkProcess.py b/pvapy/hpc/userWorkProcess.py new file mode 100755 index 0000000..6c2f0de --- /dev/null +++ b/pvapy/hpc/userWorkProcess.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python + +import threading +import queue +import os +import multiprocessing as mp +from ..utility.loggingManager import LoggingManager +from .baseController import BaseController + +class UserWorkProcess(mp.Process): + + ''' + User work process class. + + **UserWorkProcess(workerId, userDataProcessor, commandRequestQueue, commandResponseQueue, inputDataQueue, outputDataQueue, logLevel=None, logFile=None)** + + :Parameter: *workerId* (str) - Worker id. + :Parameter: *userDataProceessor* (UserDataProcessor) - Instance of the UserDataProcessor class that will be processing data. + :Parameter: *commandRequestQueue* (multiprocessing.Queue) - Command request queue. + :Parameter: *commandResponseQueue* (multiprocessing.Queue) - Command response queue. + :Parameter: *inputDataQueue* (multiprocessing.Queue) - Input data queue. + :Parameter: *outputDataQueue* (multiprocessing.Queue) - Output data queue. + :Parameter: *logLevel* (str) - Log level; possible values: debug, info, warning, error, critical. If not provided, there will be no log output. + :Parameter: *logFile* (str) - Log file. + ''' + def __init__(self, workerId, userDataProcessor, commandRequestQueue, commandResponseQueue, inputDataQueue, outputDataQueue, logLevel=None, logFile=None): + + mp.Process.__init__(self) + self.logger = LoggingManager.getLogger(self.__class__.__name__, logLevel, logFile) + self.workerId = workerId + self.userDataProcessor = userDataProcessor + + self.inputDataQueue = inputDataQueue + self.outputDataQueue = outputDataQueue + self.commandRequestQueue = commandRequestQueue + self.commandResponseQueue = commandResponseQueue + self.isStopped = True + self.rpThread = RequestProcessingThread(self) + + def start(self): + if self.isStopped: + self.isStopped = False + self.userDataProcessor.start() + mp.Process.start(self) + + def getStats(self): + return self.userDataProcessor.getStats() + + def resetStats(self): + self.userDataProcessor.resetStats() + + def configure(configDict): + self.userDataProcessor.configure(configDict) + + def process(self, data): + return self.userDataProcessor.process(data) + + def stop(self): + if not self.isStopped: + self.isStopped = True + self.userDataProcessor.stop() + return self.getStats() + + def run(self): + self.logger.debug(f'Data processing thread for worker {self.workerId} starting, PID: {os.getpid()}') + self.rpThread.start() + while True: + if self.isStopped: + break + try: + inputData = self.inputDataQueue.get(block=True, timeout=BaseController.WAIT_TIME) + outputData = self.process(inputData) + if outputData is not None: + self.outputDataQueue.put(outputData, block=False) + except queue.Empty: + pass + except Exception as ex: + self.logger.error(f'Data processing error: {ex}') + self.logger.debug(f'Data processing thread for worker {self.workerId} is exiting') + +class RequestProcessingThread(threading.Thread): + + def __init__(self, userWorkProcess): + threading.Thread.__init__(self) + self.userWorkProcess = userWorkProcess + self.logger = LoggingManager.getLogger(f'rpThread.{self.userWorkProcess.workerId}') + + def run(self): + self.logger.debug(f'Request processing thread for worker {self.userWorkProcess.workerId} starting') + while True: + if self.userWorkProcess.isStopped: + break + + # Check for new request + try: + response = {} + returnValue = None + request = self.userWorkProcess.commandRequestQueue.get(block=True, timeout=BaseController.WAIT_TIME) + self.logger.debug(f'Received request: {request}') + command = request.get('command') + requestId = request.get('requestId') + response['requestId'] = requestId + if command == BaseController.STOP_COMMAND: + returnValue = self.userWorkProcess.stop() + elif command == BaseController.CONFIGURE_COMMAND: + configDict = request.get('configDict') + self.userWorkProcess.configure(configDict) + elif command == BaseController.RESET_STATS_COMMAND: + self.userWorkProcess.resetStats() + elif command == BaseController.GET_STATS_COMMAND: + returnValue = self.userWorkProcess.getStats() + response['returnCode'] = BaseController.SUCCESS_RETURN_CODE + if returnValue is not None: + response['returnValue'] = returnValue + except queue.Empty: + pass + except Exception as ex: + self.logger.error(f'Request processing error for worker {self.userWorkProcess.workerId}: {ex}') + response['returnCode'] = BaseController.ERROR_RETURN_CODE + response['error'] = str(ex) + try: + if len(response): + self.userWorkProcess.commandResponseQueue.put(response, block=True, timeout=BaseController.WAIT_TIME) + except Exception as ex: + self.logger.error(f'Response processing error for worker {self.userWorkProcess.workerId}: {ex}') + + self.logger.debug(f'Worker {self.userWorkProcess.workerId} is done, request processing thread is exiting') + From 70e3121019c41dfe4cc58f7da989a59f492ee2fc Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Fri, 9 Dec 2022 08:39:01 -0600 Subject: [PATCH 16/31] renamed controller class --- pvapy/hpc/userWorkProcess.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pvapy/hpc/userWorkProcess.py b/pvapy/hpc/userWorkProcess.py index 6c2f0de..e07ef7e 100755 --- a/pvapy/hpc/userWorkProcess.py +++ b/pvapy/hpc/userWorkProcess.py @@ -5,7 +5,7 @@ import os import multiprocessing as mp from ..utility.loggingManager import LoggingManager -from .baseController import BaseController +from .hpcController import HpcController class UserWorkProcess(mp.Process): @@ -68,7 +68,7 @@ def run(self): if self.isStopped: break try: - inputData = self.inputDataQueue.get(block=True, timeout=BaseController.WAIT_TIME) + inputData = self.inputDataQueue.get(block=True, timeout=HpcController.WAIT_TIME) outputData = self.process(inputData) if outputData is not None: self.outputDataQueue.put(outputData, block=False) @@ -95,32 +95,32 @@ def run(self): try: response = {} returnValue = None - request = self.userWorkProcess.commandRequestQueue.get(block=True, timeout=BaseController.WAIT_TIME) + request = self.userWorkProcess.commandRequestQueue.get(block=True, timeout=HpcController.WAIT_TIME) self.logger.debug(f'Received request: {request}') command = request.get('command') requestId = request.get('requestId') response['requestId'] = requestId - if command == BaseController.STOP_COMMAND: + if command == HpcController.STOP_COMMAND: returnValue = self.userWorkProcess.stop() - elif command == BaseController.CONFIGURE_COMMAND: + elif command == HpcController.CONFIGURE_COMMAND: configDict = request.get('configDict') self.userWorkProcess.configure(configDict) - elif command == BaseController.RESET_STATS_COMMAND: + elif command == HpcController.RESET_STATS_COMMAND: self.userWorkProcess.resetStats() - elif command == BaseController.GET_STATS_COMMAND: + elif command == HpcController.GET_STATS_COMMAND: returnValue = self.userWorkProcess.getStats() - response['returnCode'] = BaseController.SUCCESS_RETURN_CODE + response['returnCode'] = HpcController.SUCCESS_RETURN_CODE if returnValue is not None: response['returnValue'] = returnValue except queue.Empty: pass except Exception as ex: self.logger.error(f'Request processing error for worker {self.userWorkProcess.workerId}: {ex}') - response['returnCode'] = BaseController.ERROR_RETURN_CODE + response['returnCode'] = HpcController.ERROR_RETURN_CODE response['error'] = str(ex) try: if len(response): - self.userWorkProcess.commandResponseQueue.put(response, block=True, timeout=BaseController.WAIT_TIME) + self.userWorkProcess.commandResponseQueue.put(response, block=True, timeout=HpcController.WAIT_TIME) except Exception as ex: self.logger.error(f'Response processing error for worker {self.userWorkProcess.workerId}: {ex}') From c0a0f0a88f5537dd7a29394f6e5e5c5916d2603f Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Fri, 9 Dec 2022 12:57:46 -0600 Subject: [PATCH 17/31] added example for using user work process class --- examples/uwpDataProcessorExample.py | 70 +++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 examples/uwpDataProcessorExample.py diff --git a/examples/uwpDataProcessorExample.py b/examples/uwpDataProcessorExample.py new file mode 100644 index 0000000..a95a31b --- /dev/null +++ b/examples/uwpDataProcessorExample.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python + +from pvapy.utility.loggingManager import LoggingManager +from pvapy.hpc.userDataProcessor import UserDataProcessor +from pvapy.hpc.userWorkProcessController import UserWorkProcessController +import multiprocessing as mp + +# Example for implementing data processor that spawns separate unix process +class UwpDataProcessor2(UserDataProcessor): + def __init__(self, configDict={}): + UserDataProcessor.__init__(self, configDict) + self.udp = UwpDataProcessor() + self.iq = mp.Queue() + self.oq = mp.Queue() + self.uwpc = UserWorkProcessController(2, self.udp, self.iq, self.oq) + + def start(self): + self.uwpc.start() + + def configure(self, configDict): + self.configure(configDict) + + def process(self, pvObject): + self.iq.put(pvObject) + return pvObject + + def resetStats(self): + self.uwpc.resetStats() + + def getStats(self): + return self.uwpc.getStats() + + def stop(self): + self.uwpc.stop() + +class UwpDataProcessor(UserDataProcessor): + + def __init__(self, configDict={}): + UserDataProcessor.__init__(self, configDict) + self.nProcessed = 0 + + # Process monitor update + def process(self, pvObject): + self.nProcessed += 1 + self.logger.debug(f'Processing: {pvObject} (nProcessed: {self.nProcessed})') + return pvObject + + # Reset statistics for user processor + def resetStats(self): + self.nProcessed = 0 + + # Retrieve statistics for user processor + def getStats(self): + return {'nProcessed' : self.nProcessed} + +if __name__ == '__main__': + LoggingManager.addStreamHandler() + LoggingManager.setLogLevel('DEBUG') + udp = UwpDataProcessor2() + iq = mp.Queue() + oq = mp.Queue() + uwpc = UserWorkProcessController(1, udp, iq, oq) + uwpc.start() + import time + for i in range(0,10): + iq.put(i) + time.sleep(1) + print(uwpc.getStats()) + statsDict = uwpc.stop() + print(statsDict) From 4718cfbefc492dcbf20615a885f8f0191a8b1bd5 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Sat, 10 Dec 2022 13:40:34 -0600 Subject: [PATCH 18/31] added user mp data processor --- pvapy/hpc/userMpDataProcessor.py | 65 ++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 pvapy/hpc/userMpDataProcessor.py diff --git a/pvapy/hpc/userMpDataProcessor.py b/pvapy/hpc/userMpDataProcessor.py new file mode 100644 index 0000000..d8bf74c --- /dev/null +++ b/pvapy/hpc/userMpDataProcessor.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python + +from ..utility.loggingManager import LoggingManager + +class UserMpDataProcessor: + ''' + Class that serves as a base for any user class that will be processing + data from a multiprocessing queue. + The following variables will be set after processor instance is created and before processing starts:\n + \t\- *logger* (logging.Logger) : logger object\n + + **UserMpDataProcessor(configDict={})** + + :Parameter: *processorId* (int) - processor id + ''' + + def __init__(self, processorId=1): + ''' + Constructor. + ''' + self.logger = LoggingManager.getLogger(self.__class__.__name__) + self.processorId = processorId + + def start(self): + ''' + Method invoked at processing startup. + ''' + pass + + def configure(self, configDict): + ''' + Method invoked at user initiated runtime configuration changes. + + :Parameter: *configDict* (dict) - dictionary containing configuration parameters + ''' + pass + + def process(self, mpqObject): + ''' + Data processing method. + + :Parameter: *mpqObject* (object) - object received from multiprocessing queue + ''' + self.logger.debug(f'Processor {self.processorId} processing object {mpqObject}') + + def stop(self): + ''' + Method invoked at processing shutdown. + ''' + pass + + def resetStats(self): + ''' + Method invoked at user initiated application statistics reset. + ''' + pass + + def getStats(self): + ''' + Method invoked periodically for generating application statistics. + + :Returns: Dictionary containing application statistics parameters + ''' + return {} + From 9b2a0828593f58bca1b9b43715995629d73ce8c8 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Sat, 10 Dec 2022 14:13:51 -0600 Subject: [PATCH 19/31] introduced mp data processor interface --- ...rExample.py => umpDataProcessorExample.py} | 26 ++++++++-------- .../{userWorkProcess.py => userMpWorker.py} | 30 ++++++++----------- ...ontroller.py => userMpWorkerController.py} | 14 ++++----- 3 files changed, 31 insertions(+), 39 deletions(-) rename examples/{uwpDataProcessorExample.py => umpDataProcessorExample.py} (67%) rename pvapy/hpc/{userWorkProcess.py => userMpWorker.py} (79%) rename pvapy/hpc/{userWorkProcessController.py => userMpWorkerController.py} (85%) diff --git a/examples/uwpDataProcessorExample.py b/examples/umpDataProcessorExample.py similarity index 67% rename from examples/uwpDataProcessorExample.py rename to examples/umpDataProcessorExample.py index a95a31b..0736577 100644 --- a/examples/uwpDataProcessorExample.py +++ b/examples/umpDataProcessorExample.py @@ -1,18 +1,17 @@ #!/usr/bin/env python from pvapy.utility.loggingManager import LoggingManager -from pvapy.hpc.userDataProcessor import UserDataProcessor -from pvapy.hpc.userWorkProcessController import UserWorkProcessController +from pvapy.hpc.userMpDataProcessor import UserMpDataProcessor +from pvapy.hpc.userMpWorkerController import UserMpWorkerController import multiprocessing as mp # Example for implementing data processor that spawns separate unix process -class UwpDataProcessor2(UserDataProcessor): - def __init__(self, configDict={}): - UserDataProcessor.__init__(self, configDict) - self.udp = UwpDataProcessor() +class UmpDataProcessor2(UserMpDataProcessor): + def __init__(self): + UserMpDataProcessor.__init__(self) + self.udp = UmpDataProcessor() self.iq = mp.Queue() - self.oq = mp.Queue() - self.uwpc = UserWorkProcessController(2, self.udp, self.iq, self.oq) + self.uwpc = UserMpWorkerController(2, self.udp, self.iq) def start(self): self.uwpc.start() @@ -33,10 +32,10 @@ def getStats(self): def stop(self): self.uwpc.stop() -class UwpDataProcessor(UserDataProcessor): +class UmpDataProcessor(UserMpDataProcessor): - def __init__(self, configDict={}): - UserDataProcessor.__init__(self, configDict) + def __init__(self): + UserMpDataProcessor.__init__(self) self.nProcessed = 0 # Process monitor update @@ -56,10 +55,9 @@ def getStats(self): if __name__ == '__main__': LoggingManager.addStreamHandler() LoggingManager.setLogLevel('DEBUG') - udp = UwpDataProcessor2() + udp = UmpDataProcessor2() iq = mp.Queue() - oq = mp.Queue() - uwpc = UserWorkProcessController(1, udp, iq, oq) + uwpc = UserMpWorkerController(1, udp, iq) uwpc.start() import time for i in range(0,10): diff --git a/pvapy/hpc/userWorkProcess.py b/pvapy/hpc/userMpWorker.py similarity index 79% rename from pvapy/hpc/userWorkProcess.py rename to pvapy/hpc/userMpWorker.py index e07ef7e..624787e 100755 --- a/pvapy/hpc/userWorkProcess.py +++ b/pvapy/hpc/userMpWorker.py @@ -7,31 +7,29 @@ from ..utility.loggingManager import LoggingManager from .hpcController import HpcController -class UserWorkProcess(mp.Process): +class UserMpWorker(mp.Process): ''' - User work process class. + User multiprocessing worker class. - **UserWorkProcess(workerId, userDataProcessor, commandRequestQueue, commandResponseQueue, inputDataQueue, outputDataQueue, logLevel=None, logFile=None)** + **UserMpWorker(workerId, userMpDataProcessor, commandRequestQueue, commandResponseQueue, inputDataQueue, logLevel=None, logFile=None)** :Parameter: *workerId* (str) - Worker id. - :Parameter: *userDataProceessor* (UserDataProcessor) - Instance of the UserDataProcessor class that will be processing data. + :Parameter: *userMpDataProceessor* (UserMpDataProcessor) - Instance of the UserMpDataProcessor class that will be processing data. :Parameter: *commandRequestQueue* (multiprocessing.Queue) - Command request queue. :Parameter: *commandResponseQueue* (multiprocessing.Queue) - Command response queue. :Parameter: *inputDataQueue* (multiprocessing.Queue) - Input data queue. - :Parameter: *outputDataQueue* (multiprocessing.Queue) - Output data queue. :Parameter: *logLevel* (str) - Log level; possible values: debug, info, warning, error, critical. If not provided, there will be no log output. :Parameter: *logFile* (str) - Log file. ''' - def __init__(self, workerId, userDataProcessor, commandRequestQueue, commandResponseQueue, inputDataQueue, outputDataQueue, logLevel=None, logFile=None): + def __init__(self, workerId, userMpDataProcessor, commandRequestQueue, commandResponseQueue, inputDataQueue, logLevel=None, logFile=None): mp.Process.__init__(self) self.logger = LoggingManager.getLogger(self.__class__.__name__, logLevel, logFile) self.workerId = workerId - self.userDataProcessor = userDataProcessor + self.userMpDataProcessor = userMpDataProcessor self.inputDataQueue = inputDataQueue - self.outputDataQueue = outputDataQueue self.commandRequestQueue = commandRequestQueue self.commandResponseQueue = commandResponseQueue self.isStopped = True @@ -40,25 +38,25 @@ def __init__(self, workerId, userDataProcessor, commandRequestQueue, commandResp def start(self): if self.isStopped: self.isStopped = False - self.userDataProcessor.start() + self.userMpDataProcessor.start() mp.Process.start(self) def getStats(self): - return self.userDataProcessor.getStats() + return self.userMpDataProcessor.getStats() def resetStats(self): - self.userDataProcessor.resetStats() + self.userMpDataProcessor.resetStats() def configure(configDict): - self.userDataProcessor.configure(configDict) + self.userMpDataProcessor.configure(configDict) def process(self, data): - return self.userDataProcessor.process(data) + return self.userMpDataProcessor.process(data) def stop(self): if not self.isStopped: self.isStopped = True - self.userDataProcessor.stop() + self.userMpDataProcessor.stop() return self.getStats() def run(self): @@ -69,9 +67,7 @@ def run(self): break try: inputData = self.inputDataQueue.get(block=True, timeout=HpcController.WAIT_TIME) - outputData = self.process(inputData) - if outputData is not None: - self.outputDataQueue.put(outputData, block=False) + self.process(inputData) except queue.Empty: pass except Exception as ex: diff --git a/pvapy/hpc/userWorkProcessController.py b/pvapy/hpc/userMpWorkerController.py similarity index 85% rename from pvapy/hpc/userWorkProcessController.py rename to pvapy/hpc/userMpWorkerController.py index a265a18..d2a8357 100755 --- a/pvapy/hpc/userWorkProcessController.py +++ b/pvapy/hpc/userMpWorkerController.py @@ -4,34 +4,32 @@ import queue import multiprocessing as mp from ..utility.loggingManager import LoggingManager -from .userWorkProcess import UserWorkProcess +from .userMpWorker import UserMpWorker from .hpcController import HpcController -class UserWorkProcessController(HpcController): +class UserMpWorkerController(HpcController): CONTROLLER_TYPE = 'user' ''' Controller class for user work process. - **UserWorkProcessController(workerId, userDataProcessor, inputDataQueue, outputDataQueue, logLevel=None, logFile=None)** + **UserMpWorkerController(workerId, userMpDataProcessor, inputDataQueue, logLevel=None, logFile=None)** :Parameter: *workerId* (str) - Worker id. - :Parameter: *userDataProceessor* (UserDataProcessor) - Instance of the UserDataProcessor class that will be processing data. + :Parameter: *userMpDataProcessor* (UserMpDataProcessor) - Instance of the UserMpDataProcessor class that will be processing data. :Parameter: *inputDataQueue* (multiprocessing.Queue) - Input data queue. - :Parameter: *outputDataQueue* (multiprocessing.Queue) - Output data queue. :Parameter: *logLevel* (str) - Log level; possible values: debug, info, warning, error, critical. If not provided, there will be no log output. :Parameter: *logFile* (str) - Log file. ''' - def __init__(self, workerId, userDataProcessor, inputDataQueue, outputDataQueue, logLevel=None, logFile=None): + def __init__(self, workerId, userMpDataProcessor, inputDataQueue, logLevel=None, logFile=None): HpcController.__init__(self, logLevel, logFile) self.workerId = workerId self.inputDataQueue = inputDataQueue - self.outputDataQueue = outputDataQueue self.commandRequestQueue = mp.Queue() self.commandResponseQueue = mp.Queue() self.requestId = 0 - self.uwProcess = UserWorkProcess(workerId, userDataProcessor, self.commandRequestQueue, self.commandResponseQueue, inputDataQueue, outputDataQueue, logLevel=None, logFile=None) + self.uwProcess = UserMpWorker(workerId, userMpDataProcessor, self.commandRequestQueue, self.commandResponseQueue, inputDataQueue, logLevel, logFile) self.pid = os.getpid() self.statsDict = {} self.isStopped = True From 1d3d99fdbc66c1d35c7ab9bc7edb0b2324096c96 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Fri, 16 Dec 2022 09:37:58 -0600 Subject: [PATCH 20/31] fix issue with server publishing less than requested number of input frames --- pvapy/cli/adSimServer.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pvapy/cli/adSimServer.py b/pvapy/cli/adSimServer.py index 481ba8a..9deb591 100755 --- a/pvapy/cli/adSimServer.py +++ b/pvapy/cli/adSimServer.py @@ -361,7 +361,8 @@ def addFrameToCache(self, frameId, ntnda): else: # Using PvObjectQueue try: - self.frameCache.put(ntnda, self.cacheTimeout) + waitTime = self.startDelay + self.cacheTimeout + self.frameCache.put(ntnda, waitTime) except pva.QueueFull: pass @@ -441,6 +442,10 @@ def framePublisher(self): self.pvaServer.update(self.channelName, frame) self.lastPublishedTime = time.time() self.nPublishedFrames += 1 + if self.usingQueue and self.nPublishedFrames >= self.nInputFrames: + self.printReport(f'Server exiting after publishing {self.nPublishedFrames}') + self.isDone = True + return runtime = 0 frameRate = 0 @@ -513,8 +518,8 @@ def main(): parser.add_argument('-dt', '--datatype', type=str, dest='datatype', default='uint8', help='Generated datatype. Possible options are int8, uint8, int16, uint16, int32, uint32, float32, float64 (default: uint8; does not apply if input file is given)') parser.add_argument('-mn', '--minimum', type=float, dest='minimum', default=None, help='Minimum generated value (does not apply if input file is given)') parser.add_argument('-mx', '--maximum', type=float, dest='maximum', default=None, help='Maximum generated value (does not apply if input file is given)') - parser.add_argument('-nf', '--n-frames', type=int, dest='n_frames', default=0, help='Number of different frames generate from the input sources; if set to <= 0, the server will use all images found in input files, or it will generate enough images to fill up the image cache if no input files were specified.') - parser.add_argument('-cs', '--cache-size', type=int, dest='cache_size', default=1000, help='Number of different frames to cache (default: 1000); if the cache size is smaller than the number of input frames, the new frames will be constantly regenerated as cached ones are published; otherwise, cached frames will be published over and over again as long as the server is running') + parser.add_argument('-nf', '--n-frames', type=int, dest='n_frames', default=0, help='Number of different frames to generate from the input sources; if set to <= 0, the server will use all images found in input files, or it will generate enough images to fill up the image cache if no input files were specified. If the requested number of input frames is greater than the cache size, the server will stop publishing after exhausting generated frames; otherwise, the generated frames will be constantly recycled and republished.') + parser.add_argument('-cs', '--cache-size', type=int, dest='cache_size', default=1000, help='Number of different frames to cache (default: 1000); if the cache size is smaller than the number of input frames, the new frames will be constantly regenerated as cached ones are published; otherwise, cached frames will be published over and over again as long as the server is running.') parser.add_argument('-rt', '--runtime', type=float, dest='runtime', default=300, help='Server runtime in seconds (default: 300 seconds)') parser.add_argument('-cn', '--channel-name', type=str, dest='channel_name', default='pvapy:image', help='Server PVA channel name (default: pvapy:image)') parser.add_argument('-npv', '--notify-pv', type=str, dest='notify_pv', default=None, help='CA channel that should be notified on start; for the default Area Detector PVA driver PV that controls image acquisition is 13PVA1:cam1:Acquire') From b84115b9ee463349f0dae71ac4cf3fc89a69755a Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Fri, 16 Dec 2022 12:43:40 -0600 Subject: [PATCH 21/31] use better loggeer name for debugging --- pvapy/hpc/userMpWorker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pvapy/hpc/userMpWorker.py b/pvapy/hpc/userMpWorker.py index 624787e..1fdb528 100755 --- a/pvapy/hpc/userMpWorker.py +++ b/pvapy/hpc/userMpWorker.py @@ -25,7 +25,8 @@ class UserMpWorker(mp.Process): def __init__(self, workerId, userMpDataProcessor, commandRequestQueue, commandResponseQueue, inputDataQueue, logLevel=None, logFile=None): mp.Process.__init__(self) - self.logger = LoggingManager.getLogger(self.__class__.__name__, logLevel, logFile) + self.logger = + LoggingManager.getLogger(f'{self.__class__.__name__}.{workerId}', logLevel, logFile) self.workerId = workerId self.userMpDataProcessor = userMpDataProcessor From f9162b0306c7e1ed27159a220379a7135d192b68 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Fri, 16 Dec 2022 13:09:38 -0600 Subject: [PATCH 22/31] fixed typo --- pvapy/hpc/userMpWorker.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pvapy/hpc/userMpWorker.py b/pvapy/hpc/userMpWorker.py index 1fdb528..7526527 100755 --- a/pvapy/hpc/userMpWorker.py +++ b/pvapy/hpc/userMpWorker.py @@ -25,8 +25,7 @@ class UserMpWorker(mp.Process): def __init__(self, workerId, userMpDataProcessor, commandRequestQueue, commandResponseQueue, inputDataQueue, logLevel=None, logFile=None): mp.Process.__init__(self) - self.logger = - LoggingManager.getLogger(f'{self.__class__.__name__}.{workerId}', logLevel, logFile) + self.logger = LoggingManager.getLogger(f'{self.__class__.__name__}.{workerId}', logLevel, logFile) self.workerId = workerId self.userMpDataProcessor = userMpDataProcessor From 73b431632f76fe40111e5852f7bcf9e05112d875 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Mon, 19 Dec 2022 09:21:38 -0600 Subject: [PATCH 23/31] allow requesting stats with prefix for all keys --- pvapy/hpc/userMpWorkerController.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pvapy/hpc/userMpWorkerController.py b/pvapy/hpc/userMpWorkerController.py index d2a8357..98e21dd 100755 --- a/pvapy/hpc/userMpWorkerController.py +++ b/pvapy/hpc/userMpWorkerController.py @@ -75,7 +75,7 @@ def start(self): signal.signal(signal.SIGINT, originalSigintHandler) self.isStopped = False - def getStats(self): + def getStats(self, keyPrefix=None): statsDict = self.statsDict try: if not self.isStopped: @@ -83,6 +83,11 @@ def getStats(self): self.statsDict = statsDict except Exception as ex: self.logger.warn(f'Cannot get stats for worker {self.workerId}: {ex}') + if keyPrefix: + statsDict2 = {} + for key, value in statsDict.items(): + statsDict2[f'{keyPrefix}.{key}'] = value + return statsDict2 return statsDict def resetStats(self): From 3e53582c42675129c3bf8b4c044f4c5dc3adf8cb Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Mon, 19 Dec 2022 09:44:00 -0600 Subject: [PATCH 24/31] use underscore for key prefix separator --- pvapy/hpc/userMpWorkerController.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pvapy/hpc/userMpWorkerController.py b/pvapy/hpc/userMpWorkerController.py index 98e21dd..19361a9 100755 --- a/pvapy/hpc/userMpWorkerController.py +++ b/pvapy/hpc/userMpWorkerController.py @@ -86,7 +86,7 @@ def getStats(self, keyPrefix=None): if keyPrefix: statsDict2 = {} for key, value in statsDict.items(): - statsDict2[f'{keyPrefix}.{key}'] = value + statsDict2[f'{keyPrefix}_{key}'] = value return statsDict2 return statsDict From 348b45ddeaff544833b425177026fa1905e001da Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Mon, 19 Dec 2022 10:10:01 -0600 Subject: [PATCH 25/31] do not try to invoke commands from child process if it has been stopped --- pvapy/hpc/userMpWorkerController.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pvapy/hpc/userMpWorkerController.py b/pvapy/hpc/userMpWorkerController.py index 19361a9..4d62804 100755 --- a/pvapy/hpc/userMpWorkerController.py +++ b/pvapy/hpc/userMpWorkerController.py @@ -92,13 +92,15 @@ def getStats(self, keyPrefix=None): def resetStats(self): try: - self._invokeCommandRequest(self.RESET_STATS_COMMAND) + if not self.isStopped: + self._invokeCommandRequest(self.RESET_STATS_COMMAND) except Exception as ex: self.logger.error(f'Cannot reset stats for worker {self.workerId}: {ex}') def configure(configDict): try: - self._invokeCommandRequest(self.CONFIGURE_COMMAND, {'configDict' : configDict}) + if not self.isStopped: + self._invokeCommandRequest(self.CONFIGURE_COMMAND, {'configDict' : configDict}) except Exception as ex: self.logger.error(f'Cannot configure worker {self.workerId}: {ex}') @@ -106,7 +108,8 @@ def stop(self): self.logger.debug(f'Stopping user work process: {self.uwProcess}') statsDict = self.statsDict try: - statsDict = self._invokeCommandRequest(self.STOP_COMMAND) + if not self.isStopped: + statsDict = self._invokeCommandRequest(self.STOP_COMMAND) except self.ProcessNotResponding as ex: pass except Exception as ex: From fc5ebf590af024ce90f9a21e8286421cab2328ab Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Mon, 19 Dec 2022 10:42:11 -0600 Subject: [PATCH 26/31] cleanup stop interface --- pvapy/hpc/userMpWorkerController.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/pvapy/hpc/userMpWorkerController.py b/pvapy/hpc/userMpWorkerController.py index 4d62804..430a7ae 100755 --- a/pvapy/hpc/userMpWorkerController.py +++ b/pvapy/hpc/userMpWorkerController.py @@ -79,8 +79,12 @@ def getStats(self, keyPrefix=None): statsDict = self.statsDict try: if not self.isStopped: - statsDict = self._invokeCommandRequest(self.GET_STATS_COMMAND) - self.statsDict = statsDict + statsDict2 = self._invokeCommandRequest(self.GET_STATS_COMMAND) + if type(statsDict2) == dict: + statsDict = statsDict2 + self.statsDict = statsDict2 + else: + self.logger.warn(f'Worker {self.workerId} generated invalid stats dict: {statsDict2}') except Exception as ex: self.logger.warn(f'Cannot get stats for worker {self.workerId}: {ex}') if keyPrefix: @@ -104,12 +108,16 @@ def configure(configDict): except Exception as ex: self.logger.error(f'Cannot configure worker {self.workerId}: {ex}') - def stop(self): + def stop(self, keyPrefix=None): self.logger.debug(f'Stopping user work process: {self.uwProcess}') statsDict = self.statsDict + if self.isStopped: + return statsDict try: - if not self.isStopped: - statsDict = self._invokeCommandRequest(self.STOP_COMMAND) + statsDict2 = self._invokeCommandRequest(self.STOP_COMMAND) + if type(statsDict2) == dict: + statsDict = statsDict2 + self.statsDict = statsDict2 except self.ProcessNotResponding as ex: pass except Exception as ex: From d0debcf7f6e975f6d3b309cb3ace158484f472d8 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Tue, 20 Dec 2022 09:33:10 -0600 Subject: [PATCH 27/31] flush input data queue at termination --- pvapy/hpc/userMpWorker.py | 7 +++++++ pvapy/hpc/userMpWorkerController.py | 6 +++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/pvapy/hpc/userMpWorker.py b/pvapy/hpc/userMpWorker.py index 7526527..13771ac 100755 --- a/pvapy/hpc/userMpWorker.py +++ b/pvapy/hpc/userMpWorker.py @@ -55,8 +55,15 @@ def process(self, data): def stop(self): if not self.isStopped: + self.logger.debug(f'Stopping worker {self.workerId}, PID: {os.getpid()}') self.isStopped = True self.userMpDataProcessor.stop() + try: + self.logger.debug(f'Emptying input data queue for worker {self.workerId}, queue size is {self.inputDataQueue.qsize()}') + while not self.inputDataQueue.empty(): + self.inputDataQueue.get(block=True, timeout=HpcController.WAIT_TIME) + except: + pass return self.getStats() def run(self): diff --git a/pvapy/hpc/userMpWorkerController.py b/pvapy/hpc/userMpWorkerController.py index 430a7ae..6537837 100755 --- a/pvapy/hpc/userMpWorkerController.py +++ b/pvapy/hpc/userMpWorkerController.py @@ -113,6 +113,7 @@ def stop(self, keyPrefix=None): statsDict = self.statsDict if self.isStopped: return statsDict + self.isStopped = True try: statsDict2 = self._invokeCommandRequest(self.STOP_COMMAND) if type(statsDict2) == dict: @@ -127,7 +128,10 @@ def stop(self, keyPrefix=None): self.uwProcess.join(self.WAIT_TIME) except: pass - self.isStopped = True + try: + self.uwProcess.kill() + except: + pass self.logger.debug(f'User work process {self.workerId} is done') return statsDict From af7f9898b7a18be58b6fdb3fb9f9fb8d462e5395 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Tue, 20 Dec 2022 09:45:58 -0600 Subject: [PATCH 28/31] add warning for exception at stop --- pvapy/hpc/userMpWorker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pvapy/hpc/userMpWorker.py b/pvapy/hpc/userMpWorker.py index 13771ac..5f8c5e4 100755 --- a/pvapy/hpc/userMpWorker.py +++ b/pvapy/hpc/userMpWorker.py @@ -62,8 +62,8 @@ def stop(self): self.logger.debug(f'Emptying input data queue for worker {self.workerId}, queue size is {self.inputDataQueue.qsize()}') while not self.inputDataQueue.empty(): self.inputDataQueue.get(block=True, timeout=HpcController.WAIT_TIME) - except: - pass + except Exception as ex: + self.logger.warn(f'Error emptying input data queue for worker {self.workerId}: {ex}') return self.getStats() def run(self): From bc2f5fa5eb3ac27606a9bfeeb38c40a22b8c8ae0 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Tue, 20 Dec 2022 10:54:29 -0600 Subject: [PATCH 29/31] added documentation for multiprocessing classes --- documentation/sphinx/pvapy.rst | 24 ++++++++ pvapy/hpc/userMpDataProcessor.py | 2 +- pvapy/hpc/userMpWorker.py | 49 ++++++++++++--- pvapy/hpc/userMpWorkerController.py | 93 +++++++++++++++++++---------- 4 files changed, 126 insertions(+), 42 deletions(-) diff --git a/documentation/sphinx/pvapy.rst b/documentation/sphinx/pvapy.rst index db8d2a0..5cb7f53 100644 --- a/documentation/sphinx/pvapy.rst +++ b/documentation/sphinx/pvapy.rst @@ -58,3 +58,27 @@ AdImageDataEncryptor :members: :inherited-members: +UserMpDataProcessor +------------------- + +.. autoclass:: pvapy.hpc.userMpDataProcessor.UserMpDataProcessor() + :show-inheritance: + :members: + :inherited-members: + +UserMpWorker +------------ + +.. autoclass:: pvapy.hpc.userMpWorker.UserMpWorker() + :show-inheritance: + :members: + :inherited-members: + +UserMpWorkerController +---------------------- + +.. autoclass:: pvapy.hpc.userMpWorkerController.UserMpWorkerController() + :show-inheritance: + :members: + :inherited-members: + diff --git a/pvapy/hpc/userMpDataProcessor.py b/pvapy/hpc/userMpDataProcessor.py index d8bf74c..c5811b6 100644 --- a/pvapy/hpc/userMpDataProcessor.py +++ b/pvapy/hpc/userMpDataProcessor.py @@ -9,7 +9,7 @@ class UserMpDataProcessor: The following variables will be set after processor instance is created and before processing starts:\n \t\- *logger* (logging.Logger) : logger object\n - **UserMpDataProcessor(configDict={})** + **UserMpDataProcessor(processorId=1)** :Parameter: *processorId* (int) - processor id ''' diff --git a/pvapy/hpc/userMpWorker.py b/pvapy/hpc/userMpWorker.py index 5f8c5e4..e66dbf6 100755 --- a/pvapy/hpc/userMpWorker.py +++ b/pvapy/hpc/userMpWorker.py @@ -36,24 +36,36 @@ def __init__(self, workerId, userMpDataProcessor, commandRequestQueue, commandRe self.rpThread = RequestProcessingThread(self) def start(self): + ''' + Method invoked at processing startup. + ''' if self.isStopped: self.isStopped = False self.userMpDataProcessor.start() mp.Process.start(self) - def getStats(self): - return self.userMpDataProcessor.getStats() - - def resetStats(self): - self.userMpDataProcessor.resetStats() - def configure(configDict): + ''' + Method invoked at user initiated runtime configuration changes. + + :Parameter: *configDict* (dict) - dictionary containing configuration parameters + ''' self.userMpDataProcessor.configure(configDict) - def process(self, data): - return self.userMpDataProcessor.process(data) + def process(self, mpqObject): + ''' + Data processing method. + + :Parameter: *mpqObject* (object) - object received from multiprocessing queue + ''' + return self.userMpDataProcessor.process(mpqObject) def stop(self): + ''' + Method invoked at processing shutdown. + + :Returns: Dictionary containing application statistics parameters + ''' if not self.isStopped: self.logger.debug(f'Stopping worker {self.workerId}, PID: {os.getpid()}') self.isStopped = True @@ -66,7 +78,25 @@ def stop(self): self.logger.warn(f'Error emptying input data queue for worker {self.workerId}: {ex}') return self.getStats() + def resetStats(self): + ''' + Method invoked at user initiated application statistics reset. + ''' + self.userMpDataProcessor.resetStats() + + def getStats(self): + ''' + Method invoked periodically for generating application statistics. + + :Returns: Dictionary containing application statistics parameters + ''' + return self.userMpDataProcessor.getStats() + def run(self): + ''' + Data processing thread. It retrieves objects from the input + queue and invokes user data processor process() method. + ''' self.logger.debug(f'Data processing thread for worker {self.workerId} starting, PID: {os.getpid()}') self.rpThread.start() while True: @@ -82,6 +112,9 @@ def run(self): self.logger.debug(f'Data processing thread for worker {self.workerId} is exiting') class RequestProcessingThread(threading.Thread): + ''' + Request processing thread for the user multiprocessing worker class. + '''' def __init__(self, userWorkProcess): threading.Thread.__init__(self) diff --git a/pvapy/hpc/userMpWorkerController.py b/pvapy/hpc/userMpWorkerController.py index 6537837..3a6d3fb 100755 --- a/pvapy/hpc/userMpWorkerController.py +++ b/pvapy/hpc/userMpWorkerController.py @@ -12,7 +12,7 @@ class UserMpWorkerController(HpcController): CONTROLLER_TYPE = 'user' ''' - Controller class for user work process. + Controller class for user multiprocessing worker process. **UserMpWorkerController(workerId, userMpDataProcessor, inputDataQueue, logLevel=None, logFile=None)** @@ -38,6 +38,15 @@ class ProcessNotResponding(Exception): def __init__(self, args): Exception.__init__(self, args) + @classmethod + def _renameDictKeys(cls, d, keyPrefix): + if keyPrefix: + d2 = {} + for key, value in d.items(): + d2[f'{keyPrefix}{key}'] = value + return d2 + return d + def _invokeCommandRequest(self, command, args={}): returnCode = None try: @@ -65,51 +74,39 @@ def _invokeCommandRequest(self, command, args={}): return returnValue def start(self): + ''' + Method invoked at processing startup. + ''' # Replace interrupt handler for worker processes - # so we can exit cleanly + # to allow clean exit import signal originalSigintHandler = signal.signal(signal.SIGINT, signal.SIG_IGN) self.logger.debug(f'Starting worker {self.workerId}') self.uwProcess.start() - self.logger.debug(f'Started user work process: {self.uwProcess}') + self.logger.debug(f'Started user worker process: {self.uwProcess}') signal.signal(signal.SIGINT, originalSigintHandler) self.isStopped = False - def getStats(self, keyPrefix=None): - statsDict = self.statsDict - try: - if not self.isStopped: - statsDict2 = self._invokeCommandRequest(self.GET_STATS_COMMAND) - if type(statsDict2) == dict: - statsDict = statsDict2 - self.statsDict = statsDict2 - else: - self.logger.warn(f'Worker {self.workerId} generated invalid stats dict: {statsDict2}') - except Exception as ex: - self.logger.warn(f'Cannot get stats for worker {self.workerId}: {ex}') - if keyPrefix: - statsDict2 = {} - for key, value in statsDict.items(): - statsDict2[f'{keyPrefix}_{key}'] = value - return statsDict2 - return statsDict - - def resetStats(self): - try: - if not self.isStopped: - self._invokeCommandRequest(self.RESET_STATS_COMMAND) - except Exception as ex: - self.logger.error(f'Cannot reset stats for worker {self.workerId}: {ex}') - def configure(configDict): + ''' + Method invoked at user initiated runtime configuration changes. + + :Parameter: *configDict* (dict) - dictionary containing configuration parameters + ''' try: if not self.isStopped: self._invokeCommandRequest(self.CONFIGURE_COMMAND, {'configDict' : configDict}) except Exception as ex: self.logger.error(f'Cannot configure worker {self.workerId}: {ex}') - def stop(self, keyPrefix=None): - self.logger.debug(f'Stopping user work process: {self.uwProcess}') + def stop(self, statsKeyPrefix=None): + ''' + Method invoked at processing shutdown. + + :Parameter: *statsKeyPrefix* (str) - optional prefix to be used for all statistics parameter keys; the prefix should start with a letter or underscore, and consist of alphanumeric and underscore characters only + :Returns: Dictionary containing application statistics parameters + ''' + self.logger.debug(f'Stopping user worker process: {self.uwProcess}') statsDict = self.statsDict if self.isStopped: return statsDict @@ -132,6 +129,36 @@ def stop(self, keyPrefix=None): self.uwProcess.kill() except: pass - self.logger.debug(f'User work process {self.workerId} is done') - return statsDict + self.logger.debug(f'User worker process {self.workerId} is done') + return self._renameDictKeys(statsDict, statsKeyPrefix) + + def resetStats(self): + ''' + Method invoked at user initiated application statistics reset. + ''' + try: + if not self.isStopped: + self._invokeCommandRequest(self.RESET_STATS_COMMAND) + except Exception as ex: + self.logger.error(f'Cannot reset stats for worker {self.workerId}: {ex}') + + def getStats(self, statsKeyPrefix=None): + ''' + Method invoked periodically for generating application statistics. + + :Parameter: *statsKeyPrefix* (str) - optional prefix to be used for all statistics parameter keys; the prefix should start with a letter or underscore, and consist of alphanumeric and underscore characters only + :Returns: Dictionary containing application statistics parameters + ''' + statsDict = self.statsDict + try: + if not self.isStopped: + statsDict2 = self._invokeCommandRequest(self.GET_STATS_COMMAND) + if type(statsDict2) == dict: + statsDict = statsDict2 + self.statsDict = statsDict2 + else: + self.logger.warn(f'Worker {self.workerId} generated invalid stats dict: {statsDict2}') + except Exception as ex: + self.logger.warn(f'Cannot get stats for worker {self.workerId}: {ex}') + return self._renameDictKeys(statsDict, statsKeyPrefix) From 296044fc1e752a26140303b06f491e3690499c99 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Tue, 20 Dec 2022 11:05:34 -0600 Subject: [PATCH 30/31] fix doc string --- pvapy/hpc/userMpWorker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pvapy/hpc/userMpWorker.py b/pvapy/hpc/userMpWorker.py index e66dbf6..a3abbf3 100755 --- a/pvapy/hpc/userMpWorker.py +++ b/pvapy/hpc/userMpWorker.py @@ -114,7 +114,7 @@ def run(self): class RequestProcessingThread(threading.Thread): ''' Request processing thread for the user multiprocessing worker class. - '''' + ''' def __init__(self, userWorkProcess): threading.Thread.__init__(self) From e7b3ad5679ebcf02ba9dbac11e6b5ed35db260df Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Sat, 24 Dec 2022 09:54:24 -0600 Subject: [PATCH 31/31] update boost version --- configure/BUILD.conf | 2 +- documentation/RELEASE_NOTES.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/configure/BUILD.conf b/configure/BUILD.conf index 59dfc1c..e1f64fb 100644 --- a/configure/BUILD.conf +++ b/configure/BUILD.conf @@ -1,5 +1,5 @@ BUILD_NUMBER=1 EPICS_BASE_VERSION=7.0.7 -BOOST_VERSION=1.78.0 +BOOST_VERSION=1.81.0 PVAPY_VERSION=0.0 PVAPY_GIT_VERSION=master diff --git a/documentation/RELEASE_NOTES.md b/documentation/RELEASE_NOTES.md index 02e0b0d..87aa5aa 100644 --- a/documentation/RELEASE_NOTES.md +++ b/documentation/RELEASE_NOTES.md @@ -6,7 +6,7 @@ - added ability to load images from HDF5 files (either compressed or uncompressed) - Conda/pip package dependencies: - EPICS BASE = 7.0.7 - - BOOST = 1.78.0 + - BOOST = 1.81.0 - NUMPY >= 1.22 (for python >= 3.8); >= 1.19, < 1.21 (for python < 3.8) ## Release 5.2.0 (2022/11/04)