diff --git a/configure/BUILD.conf b/configure/BUILD.conf index 9b163ee..efe6c4d 100644 --- a/configure/BUILD.conf +++ b/configure/BUILD.conf @@ -1,5 +1,5 @@ BUILD_NUMBER=1 EPICS_BASE_VERSION=7.0.7 -BOOST_VERSION=1.78.0 -PVAPY_VERSION=5.2.0 +BOOST_VERSION=1.81.0 +PVAPY_VERSION=5.2.1 PVAPY_GIT_VERSION=master diff --git a/documentation/RELEASE_NOTES.md b/documentation/RELEASE_NOTES.md index 6e78ea3..87aa5aa 100644 --- a/documentation/RELEASE_NOTES.md +++ b/documentation/RELEASE_NOTES.md @@ -1,3 +1,14 @@ +## Release 5.2.1 (2022/11/DD) + +- Fixed issue with put into PvObjectQueue when timeout is given +- Updated default value for PvTimeStamp userTag field to 0 +- Area Detector Simulator enhancements: + - added ability to load images from HDF5 files (either compressed or uncompressed) +- Conda/pip package dependencies: + - EPICS BASE = 7.0.7 + - BOOST = 1.81.0 + - NUMPY >= 1.22 (for python >= 3.8); >= 1.19, < 1.21 (for python < 3.8) + ## Release 5.2.0 (2022/11/04) - Streaming Framework enhancements: diff --git a/documentation/sphinx/pvapy.rst b/documentation/sphinx/pvapy.rst index db8d2a0..5cb7f53 100644 --- a/documentation/sphinx/pvapy.rst +++ b/documentation/sphinx/pvapy.rst @@ -58,3 +58,27 @@ AdImageDataEncryptor :members: :inherited-members: +UserMpDataProcessor +------------------- + +.. autoclass:: pvapy.hpc.userMpDataProcessor.UserMpDataProcessor() + :show-inheritance: + :members: + :inherited-members: + +UserMpWorker +------------ + +.. autoclass:: pvapy.hpc.userMpWorker.UserMpWorker() + :show-inheritance: + :members: + :inherited-members: + +UserMpWorkerController +---------------------- + +.. autoclass:: pvapy.hpc.userMpWorkerController.UserMpWorkerController() + :show-inheritance: + :members: + :inherited-members: + diff --git a/examples/umpDataProcessorExample.py b/examples/umpDataProcessorExample.py new file mode 100644 index 0000000..0736577 --- /dev/null +++ b/examples/umpDataProcessorExample.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python + +from pvapy.utility.loggingManager import LoggingManager +from pvapy.hpc.userMpDataProcessor import UserMpDataProcessor +from pvapy.hpc.userMpWorkerController import UserMpWorkerController +import multiprocessing as mp + +# Example for implementing data processor that spawns separate unix process +class UmpDataProcessor2(UserMpDataProcessor): + def __init__(self): + UserMpDataProcessor.__init__(self) + self.udp = UmpDataProcessor() + self.iq = mp.Queue() + self.uwpc = UserMpWorkerController(2, self.udp, self.iq) + + def start(self): + self.uwpc.start() + + def configure(self, configDict): + self.configure(configDict) + + def process(self, pvObject): + self.iq.put(pvObject) + return pvObject + + def resetStats(self): + self.uwpc.resetStats() + + def getStats(self): + return self.uwpc.getStats() + + def stop(self): + self.uwpc.stop() + +class UmpDataProcessor(UserMpDataProcessor): + + def __init__(self): + UserMpDataProcessor.__init__(self) + self.nProcessed = 0 + + # Process monitor update + def process(self, pvObject): + self.nProcessed += 1 + self.logger.debug(f'Processing: {pvObject} (nProcessed: {self.nProcessed})') + return pvObject + + # Reset statistics for user processor + def resetStats(self): + self.nProcessed = 0 + + # Retrieve statistics for user processor + def getStats(self): + return {'nProcessed' : self.nProcessed} + +if __name__ == '__main__': + LoggingManager.addStreamHandler() + LoggingManager.setLogLevel('DEBUG') + udp = UmpDataProcessor2() + iq = mp.Queue() + uwpc = UserMpWorkerController(1, udp, iq) + uwpc.start() + import time + for i in range(0,10): + iq.put(i) + time.sleep(1) + print(uwpc.getStats()) + statsDict = uwpc.stop() + print(statsDict) diff --git a/pvapy/cli/adSimServer.py b/pvapy/cli/adSimServer.py index 22efef0..9deb591 100755 --- a/pvapy/cli/adSimServer.py +++ b/pvapy/cli/adSimServer.py @@ -5,18 +5,185 @@ import threading import queue import argparse -import numpy as np -import pvaccess as pva import os import os.path +import numpy as np +# HDF5 is optional +try: + import h5py as h5 +except ImportError: + h5 = None +try: + import hdf5plugin +except ImportError: + pass + +import pvaccess as pva from ..utility.adImageUtility import AdImageUtility from ..utility.floatWithUnits import FloatWithUnits from ..utility.intWithUnits import IntWithUnits __version__ = pva.__version__ +class FrameGenerator: + def __init__(self): + self.frames = None + self.nInputFrames = 0 + self.rows = 0 + self.cols = 0 + self.dtype = None + self.compressorName = None + + def getFrameData(self, frameId): + if frameId < self.nInputFrames and frameId >= 0: + return self.frames[frameId] + return None + + def getFrameInfo(self): + if self.frames is not None and not self.nInputFrames: + self.nInputFrames, self.rows, self.cols = self.frames.shape + self.dtype = self.frames.dtype + return (self.nInputFrames, self.rows, self.cols, self.dtype, self.compressorName) + + def getUncompressedFrameSize(self): + return self.rows*self.cols*self.frames[0].itemsize + + def getCompressedFrameSize(self): + if self.compressorName: + return len(self.getFrameData(0)) + else: + return self.getUncompressedFrameSize() + + def getCompressorName(self): + return self.compressorName + +class HdfFileGenerator(FrameGenerator): + + COMPRESSOR_NAME_MAP = { + '32001' : 'blosc' + } + + def __init__(self, filePath, datasetPath, compressionMode=False): + FrameGenerator.__init__(self) + self.filePath = filePath + self.datasetPath = datasetPath + self.dataset = None + self.compressionMode = compressionMode + if not h5: + raise Exception(f'Missing HDF support.') + if not filePath: + raise Exception(f'Invalid input file path.') + if not datasetPath: + raise Exception(f'Missing HDF dataset specification for input file {filePath}.') + self.loadInputFile() + + def loadInputFile(self): + try: + self.file = h5.File(self.filePath, 'r') + self.dataset = self.file[self.datasetPath] + self.frames = self.dataset + if self.compressionMode: + for id,params in self.dataset._filters.items(): + compressorName = self.COMPRESSOR_NAME_MAP.get(id) + if compressorName: + self.compressorName = compressorName + break + print(f'Loaded input file {self.filePath} (compressor: {self.compressorName})') + except Exception as ex: + print(f'Cannot load input file {self.filePath}: {ex}') + raise + + def getFrameData(self, frameId): + frameData = None + if frameId < self.nInputFrames and frameId >= 0: + if not self.compressorName: + # Read uncompressed data + frameData = self.frames[frameId] + else: + # Read compressed data directly into numpy array + data = self.dataset.id.read_direct_chunk((frameId,0,0)) + frameData = np.frombuffer(data[1], dtype=np.uint8) + return frameData + +class NumpyFileGenerator(FrameGenerator): + + def __init__(self, filePath, mmapMode): + FrameGenerator.__init__(self) + self.filePath = filePath + self.mmapMode = mmapMode + if not filePath: + raise Exception(f'Invalid input file path.') + self.loadInputFile() + + def loadInputFile(self): + try: + if self.mmapMode: + self.frames = np.load(self.filePath, mmapMode='r') + else: + self.frames = np.load(self.filePath) + print(f'Loaded input file {self.filePath}') + except Exception as ex: + print(f'Cannot load input file {self.filePath}: {ex}') + raise + +class NumpyRandomGenerator(FrameGenerator): + + def __init__(self, nf, nx, ny, datatype, minimum, maximum): + FrameGenerator.__init__(self) + self.nf = nf + self.nx = nx + self.ny = ny + self.datatype = datatype + self.minimum = minimum + self.maximum = maximum + self.generateFrames() + + def generateFrames(self): + print('Generating random frames') + + # Example frame: + # frame = np.array([[0,0,0,0,0,0,0,0,0,0], + # [0,0,0,0,1,1,0,0,0,0], + # [0,0,0,1,2,3,2,0,0,0], + # [0,0,0,1,2,3,2,0,0,0], + # [0,0,0,1,2,3,2,0,0,0], + # [0,0,0,0,0,0,0,0,0,0]], dtype=np.uint16) + + dt = np.dtype(self.datatype) + if not self.datatype.startswith('float'): + dtinfo = np.iinfo(dt) + mn = dtinfo.min + if self.minimum is not None: + mn = int(max(dtinfo.min, self.minimum)) + mx = dtinfo.max + if self.maximum is not None: + mx = int(min(dtinfo.max, self.maximum)) + self.frames = np.random.randint(mn, mx, size=(self.nf, self.ny, self.nx), dtype=dt) + else: + # Use float32 for min/max, to prevent overflow errors + dtinfo = np.finfo(np.float32) + mn = dtinfo.min + if self.minimum is not None: + mn = float(max(dtinfo.min, self.minimum)) + mx = dtinfo.max + if self.maximum is not None: + mx = float(min(dtinfo.max, self.maximum)) + self.frames = np.random.uniform(mn, mx, size=(self.nf, self.ny, self.nx)) + if datatype == 'float32': + self.frames = np.float32(self.frames) + + print(f'Generated frame shape: {self.frames[0].shape}') + print(f'Range of generated values: [{mn},{mx}]') + class AdSimServer: + # Uses frame cache of a given size. If the number of input + # files is larger than the cache size, the server will be constantly + # regenerating frames. + + SHUTDOWN_DELAY = 1.0 + MIN_CACHE_SIZE = 1 + CACHE_TIMEOUT = 1.0 DELAY_CORRECTION = 0.0001 NOTIFICATION_DELAY = 0.1 BYTES_IN_MEGABYTE = 1000000 @@ -25,78 +192,58 @@ class AdSimServer: 'timeStamp' : pva.PvTimeStamp() } - def __init__(self, inputDirectory, inputFile, mmapMode, frameRate, nf, nx, ny, datatype, minimum, maximum, runtime, channelName, notifyPv, notifyPvValue, metadataPv, startDelay, reportPeriod): + def __init__(self, inputDirectory, inputFile, mmapMode, hdfDataset, hdfCompressionMode, frameRate, nFrames, cacheSize, nx, ny, datatype, minimum, maximum, runtime, channelName, notifyPv, notifyPvValue, metadataPv, startDelay, reportPeriod, disableCurses): + self.lock = threading.Lock() self.deltaT = 0 + self.cacheTimeout = self.CACHE_TIMEOUT if frameRate > 0: self.deltaT = 1.0/frameRate + self.cacheTimeout = max(self.CACHE_TIMEOUT, self.deltaT) self.runtime = runtime self.reportPeriod = reportPeriod - self.pvaServer = pva.PvaServer() self.metadataIoc = None + self.frameGeneratorList = [] + self.frameCacheSize = max(cacheSize, self.MIN_CACHE_SIZE) + self.nFrames = nFrames - self.setupMetadataPvs(metadataPv) inputFiles = [] if inputDirectory is not None: inputFiles = [os.path.join(inputDirectory, f) for f in os.listdir(inputDirectory) if os.path.isfile(os.path.join(inputDirectory, f))] if inputFile is not None: inputFiles.append(inputFile) - self.frames = None + allowedHdfExtensions = ['h5', 'hdf', 'hdf5'] for f in inputFiles: - try: - if mmapMode: - newFrames = np.load(f, mmapMode='r') - else: - newFrames = np.load(f) - if self.frames is None: - self.frames = newFrames - else: - self.frames = np.append(self.frames, newFrames, axis=0) - print(f'Loaded input file {f}') - except Exception as ex: - print(f'Cannot load input file {f}, skipping it: {ex}') - if self.frames is None: - print('Generating random frames') - # Example frame: - # frame = np.array([[0,0,0,0,0,0,0,0,0,0], - # [0,0,0,0,1,1,0,0,0,0], - # [0,0,0,1,2,3,2,0,0,0], - # [0,0,0,1,2,3,2,0,0,0], - # [0,0,0,1,2,3,2,0,0,0], - # [0,0,0,0,0,0,0,0,0,0]], dtype=np.uint16) - dt = np.dtype(datatype) - if datatype != 'float32' and datatype != 'float64': - dtinfo = np.iinfo(dt) - mn = dtinfo.min - if minimum is not None: - mn = int(max(dtinfo.min, minimum)) - mx = dtinfo.max - if maximum is not None: - mx = int(min(dtinfo.max, maximum)) - self.frames = np.random.randint(mn, mx, size=(nf, ny, nx), dtype=dt) + ext = f.split('.')[-1] + if ext in allowedHdfExtensions: + self.frameGeneratorList.append(HdfFileGenerator(f, hdfDataset, hdfCompressionMode)) else: - # Use float32 for min/max, to prevent overflow errors - dtinfo = np.finfo(np.float32) - mn = dtinfo.min - if minimum is not None: - mn = float(max(dtinfo.min, minimum)) - mx = dtinfo.max - if maximum is not None: - mx = float(min(dtinfo.max, maximum)) - self.frames = np.random.uniform(mn, mx, size=(nf, ny, nx)) - if datatype == 'float32': - self.frames = np.float32(self.frames) - - print(f'Generated frame shape: {self.frames[0].shape}') - print(f'Range of generated values: [{mn},{mx}]') + self.frameGeneratorList.append(NumpyFileGenerator(f, mmapMode)) + + if not self.frameGeneratorList: + nf = nFrames + if nf <= 0: + nf = self.frameCacheSize + self.frameGeneratorList.append(NumpyRandomGenerator(nf, nx, ny, datatype, minimum, maximum)) + + self.nInputFrames = 0 + for fg in self.frameGeneratorList: + nInputFrames, self.rows, self.cols, self.dtype, self.compressorName = fg.getFrameInfo() + self.nInputFrames += nInputFrames + if self.nFrames > 0: + self.nInputFrames = min(self.nFrames, self.nInputFrames) + + fg = self.frameGeneratorList[0] self.frameRate = frameRate - self.nInputFrames, self.rows, self.cols = self.frames.shape - self.imageSize = IntWithUnits(self.rows*self.cols*self.frames[0].itemsize, 'B') - self.expectedDataRate = FloatWithUnits(self.imageSize*self.frameRate/self.BYTES_IN_MEGABYTE, 'MBps') - print(f'Number of input frames: {self.nInputFrames} (size: {self.cols}x{self.rows}, {self.imageSize}, type: {self.frames.dtype})') - print(f'Expected data rate: {self.expectedDataRate}') + self.uncompressedImageSize = IntWithUnits(fg.getUncompressedFrameSize(), 'B') + self.compressedImageSize = IntWithUnits(fg.getCompressedFrameSize(), 'B') + self.compressedDataRate = FloatWithUnits(self.compressedImageSize*self.frameRate/self.BYTES_IN_MEGABYTE, 'MBps') + self.uncompressedDataRate = FloatWithUnits(self.uncompressedImageSize*self.frameRate/self.BYTES_IN_MEGABYTE, 'MBps') self.channelName = channelName + self.pvaServer = pva.PvaServer() + self.setupMetadataPvs(metadataPv) self.pvaServer.addRecord(self.channelName, pva.NtNdArray(), None) + if notifyPv and notifyPvValue: try: time.sleep(self.NOTIFICATION_DELAY) @@ -106,7 +253,19 @@ def __init__(self, inputDirectory, inputFile, mmapMode, frameRate, nf, nx, ny, d except Exception as ex: print(f'Could not set notification PV {notifyPv} to {notifyPvValue}: {ex}') - self.frameMap = {} + # Use PvObjectQueue if cache size is too small for all input frames + # Otherwise, simple dictionary is good enough + self.usingQueue = False + if self.nInputFrames > self.frameCacheSize: + self.usingQueue = True + self.frameCache = pva.PvObjectQueue(self.frameCacheSize) + else: + self.frameCache = {} + + print(f'Number of input frames: {self.nInputFrames} (size: {self.cols}x{self.rows}, {self.uncompressedImageSize}, type: {self.dtype}, compressor: {self.compressorName}, compressed size: {self.compressedImageSize})') + print(f'Frame cache type: {type(self.frameCache)} (cache size: {self.frameCacheSize})') + print(f'Expected data rate: {self.compressedDataRate} (uncompressed: {self.uncompressedDataRate})') + self.currentFrameId = 0 self.nPublishedFrames = 0 self.startTime = 0 @@ -115,15 +274,17 @@ def __init__(self, inputDirectory, inputFile, mmapMode, frameRate, nf, nx, ny, d self.isDone = False self.screen = None self.screenInitialized = False + self.disableCurses = disableCurses def setupCurses(self): screen = None - try: - import curses - screen = curses.initscr() - self.curses = curses - except ImportError as ex: - pass + if not self.disableCurses: + try: + import curses + screen = curses.initscr() + self.curses = curses + except ImportError as ex: + pass return screen def setupMetadataPvs(self, metadataPv): @@ -193,30 +354,64 @@ def updateMetadataPvs(self, metadataValueDict): self.pvaServer.update(mPv, mPvObject) return t - def frameProducer(self, extraFieldsPvObject=None): - for frameId in range(0, self.nInputFrames): - if self.isDone: - return + def addFrameToCache(self, frameId, ntnda): + if not self.usingQueue: + # Using dictionary + self.frameCache[frameId] = ntnda + else: + # Using PvObjectQueue + try: + waitTime = self.startDelay + self.cacheTimeout + self.frameCache.put(ntnda, waitTime) + except pva.QueueFull: + pass + + def getFrameFromCache(self): + if not self.usingQueue: + # Using dictionary + cachedFrameId = self.currentFrameId % self.nInputFrames + if cachedFrameId not in self.frameCache: + # In case frames were not generated on time, just use first frame + cachedFrameId = 0 + ntnda = self.frameCache[cachedFrameId] + else: + # Using PvObjectQueue + ntnda = self.frameCache.get(self.cacheTimeout) + return ntnda - frame = self.frames[frameId] - self.frameMap[frameId] = AdImageUtility.generateNtNdArray2D(frameId, frame, extraFieldsPvObject) + def frameProducer(self, extraFieldsPvObject=None): + startTime = time.time() + frameId = 0 + frameData = None + while not self.isDone: + for fg in self.frameGeneratorList: + nInputFrames, ny, nx, dtype, compressorName = fg.getFrameInfo() + for fgFrameId in range(0,nInputFrames): + if self.isDone or (self.nInputFrames > 0 and frameId >= self.nInputFrames): + break + frameData = fg.getFrameData(fgFrameId) + if frameData is None: + break + ntnda = AdImageUtility.generateNtNdArray2D(frameId, frameData, nx, ny, dtype, compressorName, extraFieldsPvObject) + self.addFrameToCache(frameId, ntnda) + frameId += 1 + if self.isDone or not self.usingQueue or frameData is None or (self.nInputFrames > 0 and frameId >= self.nInputFrames): + # All frames are in cache or we cannot generate any more data + break + self.printReport(f'Frame producer is done after {frameId} generated frames') def prepareFrame(self, t=0): # Get cached frame - cachedFrameId = self.currentFrameId % self.nInputFrames - if cachedFrameId not in self.frameMap: - # In case frames were not generated on time, use first frame - cachedFrameId = 0 - frame = self.frameMap[cachedFrameId] - - # Correct image id and timestamps - self.currentFrameId += 1 - frame['uniqueId'] = self.currentFrameId - if t <= 0: - t = time.time() - ts = pva.PvTimeStamp(t) - frame['timeStamp'] = ts - frame['dataTimeStamp'] = ts + frame = self.getFrameFromCache() + if frame is not None: + # Correct image id and timestamps + self.currentFrameId += 1 + frame['uniqueId'] = self.currentFrameId + if t <= 0: + t = time.time() + ts = pva.PvTimeStamp(t) + frame['timeStamp'] = ts + frame['dataTimeStamp'] = ts return frame def framePublisher(self): @@ -232,12 +427,25 @@ def framePublisher(self): # Prepare frame with a given timestamp # so that metadata and image times are as close as possible - frame = self.prepareFrame(updateTime) + try: + frame = self.prepareFrame(updateTime) + except pva.QueueEmpty: + self.printReport(f'Server exiting after emptying queue') + self.isDone = True + return + except Exception: + if self.isDone: + return + raise # Publish frame self.pvaServer.update(self.channelName, frame) self.lastPublishedTime = time.time() self.nPublishedFrames += 1 + if self.usingQueue and self.nPublishedFrames >= self.nInputFrames: + self.printReport(f'Server exiting after publishing {self.nPublishedFrames}') + self.isDone = True + return runtime = 0 frameRate = 0 @@ -249,21 +457,10 @@ def framePublisher(self): self.startTime = self.lastPublishedTime if self.reportPeriod > 0 and (self.nPublishedFrames % self.reportPeriod) == 0: report = 'Published frame id {:6d} @ {:.3f}s (frame rate: {:.4f}fps; runtime: {:.3f}s)'.format(self.currentFrameId, self.lastPublishedTime, frameRate, runtime) - if not self.screenInitialized: - self.screenInitialized = True - self.screen = self.setupCurses() - - if self.screen: - self.screen.erase() - self.screen.addstr(f'{report}\n') - self.screen.refresh() - else: - print(report) + self.printReport(report) if runtime > self.runtime: - if self.screen: - self.curses.endwin() - print('Server exiting after reaching runtime of {:.3f} seconds'.format(self.runtime)) + self.printReport(f'Server exiting after reaching runtime of {runtime:.3f} seconds') return if self.deltaT > 0: @@ -273,6 +470,18 @@ def framePublisher(self): threading.Timer(delay, self.framePublisher).start() return + def printReport(self, report): + with self.lock: + if not self.screenInitialized: + self.screenInitialized = True + self.screen = self.setupCurses() + if self.screen: + self.screen.erase() + self.screen.addstr(f'{report}\n') + self.screen.refresh() + else: + print(report) + def start(self): threading.Thread(target=self.frameProducer, daemon=True).start() self.pvaServer.start() @@ -282,9 +491,13 @@ def stop(self): self.isDone = True self.pvaServer.stop() runtime = self.lastPublishedTime - self.startTime - deltaT = runtime/(self.nPublishedFrames - 1) - frameRate = 1.0/deltaT - dataRate = FloatWithUnits(self.imageSize*frameRate/self.BYTES_IN_MEGABYTE, 'MBps') + deltaT = 0 + frameRate = 0 + if self.nPublishedFrames > 1: + deltaT = runtime/(self.nPublishedFrames - 1) + frameRate = 1.0/deltaT + dataRate = FloatWithUnits(self.uncompressedImageSize*frameRate/self.BYTES_IN_MEGABYTE, 'MBps') + time.sleep(self.SHUTDOWN_DELAY) if self.screen: self.curses.endwin() print('\nServer runtime: {:.4f} seconds'.format(runtime)) @@ -297,13 +510,16 @@ def main(): parser.add_argument('-id', '--input-directory', type=str, dest='input_directory', default=None, help='Directory containing input files to be streamed; if input directory or input file are not provided, random images will be generated') parser.add_argument('-if', '--input-file', type=str, dest='input_file', default=None, help='Input file to be streamed; if input directory or input file are not provided, random images will be generated') parser.add_argument('-mm', '--mmap-mode', action='store_true', dest='mmap_mode', default=False, help='Use NumPy memory map to load the specified input file. This flag typically results in faster startup and lower memory usage for large files.') + parser.add_argument('-hds', '--hdf-dataset', dest='hdf_dataset', default=None, help='HDF5 dataset path. This option must be specified if HDF5 files are used as input, but otherwise it is ignored.') + parser.add_argument('-hcm', '--hdf-compression-mode', dest='hdf_compression_mode', default=False, action='store_true', help='Use compressed data from HDF5 file. By default, data will be uncompressed before streaming it.') parser.add_argument('-fps', '--frame-rate', type=float, dest='frame_rate', default=20, help='Frames per second (default: 20 fps)') parser.add_argument('-nx', '--n-x-pixels', type=int, dest='n_x_pixels', default=256, help='Number of pixels in x dimension (default: 256 pixels; does not apply if input file file is given)') parser.add_argument('-ny', '--n-y-pixels', type=int, dest='n_y_pixels', default=256, help='Number of pixels in x dimension (default: 256 pixels; does not apply if input file is given)') parser.add_argument('-dt', '--datatype', type=str, dest='datatype', default='uint8', help='Generated datatype. Possible options are int8, uint8, int16, uint16, int32, uint32, float32, float64 (default: uint8; does not apply if input file is given)') parser.add_argument('-mn', '--minimum', type=float, dest='minimum', default=None, help='Minimum generated value (does not apply if input file is given)') parser.add_argument('-mx', '--maximum', type=float, dest='maximum', default=None, help='Maximum generated value (does not apply if input file is given)') - parser.add_argument('-nf', '--n-frames', type=int, dest='n_frames', default=1000, help='Number of different frames to generate and cache; those images will be published over and over again as long as the server is running') + parser.add_argument('-nf', '--n-frames', type=int, dest='n_frames', default=0, help='Number of different frames to generate from the input sources; if set to <= 0, the server will use all images found in input files, or it will generate enough images to fill up the image cache if no input files were specified. If the requested number of input frames is greater than the cache size, the server will stop publishing after exhausting generated frames; otherwise, the generated frames will be constantly recycled and republished.') + parser.add_argument('-cs', '--cache-size', type=int, dest='cache_size', default=1000, help='Number of different frames to cache (default: 1000); if the cache size is smaller than the number of input frames, the new frames will be constantly regenerated as cached ones are published; otherwise, cached frames will be published over and over again as long as the server is running.') parser.add_argument('-rt', '--runtime', type=float, dest='runtime', default=300, help='Server runtime in seconds (default: 300 seconds)') parser.add_argument('-cn', '--channel-name', type=str, dest='channel_name', default='pvapy:image', help='Server PVA channel name (default: pvapy:image)') parser.add_argument('-npv', '--notify-pv', type=str, dest='notify_pv', default=None, help='CA channel that should be notified on start; for the default Area Detector PVA driver PV that controls image acquisition is 13PVA1:cam1:Acquire') @@ -311,18 +527,25 @@ def main(): parser.add_argument('-mpv', '--metadata-pv', type=str, dest='metadata_pv', default=None, help='Comma-separated list of CA channels that should be contain simulated image metadata values') parser.add_argument('-sd', '--start-delay', type=float, dest='start_delay', default=10.0, help='Server start delay in seconds (default: 10 seconds)') parser.add_argument('-rp', '--report-period', type=int, dest='report_period', default=1, help='Reporting period for publishing frames; if set to <=0 no frames will be reported as published (default: 1)') + parser.add_argument('-dc', '--disable-curses', dest='disable_curses', default=False, action='store_true', help='Disable curses library screen handling. This is enabled by default, except when logging into standard output is turned on.') args, unparsed = parser.parse_known_args() if len(unparsed) > 0: print('Unrecognized argument(s): %s' % ' '.join(unparsed)) exit(1) - server = AdSimServer(inputDirectory=args.input_directory, inputFile=args.input_file, mmapMode=args.mmap_mode, frameRate=args.frame_rate, nf=args.n_frames, nx=args.n_x_pixels, ny=args.n_y_pixels, datatype=args.datatype, minimum=args.minimum, maximum=args.maximum, runtime=args.runtime, channelName=args.channel_name, notifyPv=args.notify_pv, notifyPvValue=args.notify_pv_value, metadataPv=args.metadata_pv, startDelay=args.start_delay, reportPeriod=args.report_period) + server = AdSimServer(inputDirectory=args.input_directory, inputFile=args.input_file, mmapMode=args.mmap_mode, hdfDataset=args.hdf_dataset, hdfCompressionMode=args.hdf_compression_mode, frameRate=args.frame_rate, nFrames=args.n_frames, cacheSize=args.cache_size, nx=args.n_x_pixels, ny=args.n_y_pixels, datatype=args.datatype, minimum=args.minimum, maximum=args.maximum, runtime=args.runtime, channelName=args.channel_name, notifyPv=args.notify_pv, notifyPvValue=args.notify_pv_value, metadataPv=args.metadata_pv, startDelay=args.start_delay, reportPeriod=args.report_period, disableCurses=args.disable_curses) server.start() + expectedRuntime = args.runtime+args.start_delay+server.SHUTDOWN_DELAY + startTime = time.time() try: - runtime = args.runtime + 2*args.start_delay - time.sleep(runtime) + while True: + time.sleep(1) + now = time.time() + runtime = now - startTime + if runtime > expectedRuntime or server.isDone: + break except KeyboardInterrupt as ex: pass server.stop() diff --git a/pvapy/hpc/dataCollectorController.py b/pvapy/hpc/dataCollectorController.py index 2f9f565..a3c107f 100755 --- a/pvapy/hpc/dataCollectorController.py +++ b/pvapy/hpc/dataCollectorController.py @@ -8,10 +8,10 @@ from .dataProcessingController import DataProcessingController from .sourceChannel import SourceChannel from .dataCollector import DataCollector -from .hpcController import HpcController +from .systemController import SystemController -class DataCollectorController(HpcController): +class DataCollectorController(SystemController): CONTROLLER_TYPE = 'collector' @@ -45,7 +45,7 @@ class DataCollectorController(HpcController): ''' def __init__(self, inputChannel, outputChannel=None, statusChannel=None, controlChannel=None, idFormatSpec=None, processorFile=None, processorClass=None, processorArgs=None, objectIdField='uniqueId', objectIdOffset=0, fieldRequest='', skipInitialUpdates=1, reportStatsList='all', logLevel=None, logFile=None, disableCurses=False, collectorId=1, producerIdList='1,2', serverQueueSize=0, monitorQueueSize=-1, collectorCacheSize=-1, metadataChannels=None): - HpcController.__init__(self, inputChannel, outputChannel=outputChannel, statusChannel=statusChannel, controlChannel=controlChannel, idFormatSpec=idFormatSpec, processorFile=processorFile, processorClass=processorClass, processorArgs=processorArgs, objectIdField=objectIdField, objectIdOffset=objectIdOffset, fieldRequest=fieldRequest, skipInitialUpdates=skipInitialUpdates, reportStatsList=reportStatsList, logLevel=logLevel, logFile=logFile, disableCurses=disableCurses) + SystemController.__init__(self, inputChannel, outputChannel=outputChannel, statusChannel=statusChannel, controlChannel=controlChannel, idFormatSpec=idFormatSpec, processorFile=processorFile, processorClass=processorClass, processorArgs=processorArgs, objectIdField=objectIdField, objectIdOffset=objectIdOffset, fieldRequest=fieldRequest, skipInitialUpdates=skipInitialUpdates, reportStatsList=reportStatsList, logLevel=logLevel, logFile=logFile, disableCurses=disableCurses) self.collectorId = collectorId self.producerIdListSpec = producerIdList @@ -67,7 +67,7 @@ def createDataProcessorConfig(self, collectorId): self.logger.debug(f'Processor output channel name: {self.outputChannel}') # Create config dict - return HpcController.createDataProcessorConfig(self, collectorId) + return SystemController.createDataProcessorConfig(self, collectorId) def getStatusTypeDict(self): statusTypeDict = DataCollector.STATUS_TYPE_DICT diff --git a/pvapy/hpc/dataConsumerController.py b/pvapy/hpc/dataConsumerController.py index d5838bc..0ab382b 100755 --- a/pvapy/hpc/dataConsumerController.py +++ b/pvapy/hpc/dataConsumerController.py @@ -8,9 +8,9 @@ from .dataProcessingController import DataProcessingController from .sourceChannel import SourceChannel from .dataConsumer import DataConsumer -from .hpcController import HpcController +from .systemController import SystemController -class DataConsumerController(HpcController): +class DataConsumerController(SystemController): CONTROLLER_TYPE = 'consumer' @@ -53,7 +53,7 @@ class DataConsumerController(HpcController): ''' def __init__(self, inputChannel, outputChannel=None, statusChannel=None, controlChannel=None, idFormatSpec=None, processorFile=None, processorClass=None, processorArgs=None, objectIdField='uniqueId', objectIdOffset=0, fieldRequest='', skipInitialUpdates=1, reportStatsList='all', logLevel=None, logFile=None, disableCurses=False, consumerId=1, nConsumers=1, consumerIdList=None, inputProviderType='pva', serverQueueSize=0, monitorQueueSize=-1, accumulateObjects=-1, accumulationTimeout=1, distributorPluginName='pydistributor', distributorGroup=None, distributorSet=None, distributorTrigger=None, distributorUpdates=None, nDistributorSets=1, metadataChannels=None): - HpcController.__init__(self, inputChannel, outputChannel=outputChannel, statusChannel=statusChannel, controlChannel=controlChannel, idFormatSpec=idFormatSpec, processorFile=processorFile, processorClass=processorClass, processorArgs=processorArgs, objectIdField=objectIdField, objectIdOffset=objectIdOffset, fieldRequest=fieldRequest, skipInitialUpdates=skipInitialUpdates, reportStatsList=reportStatsList, logLevel=logLevel, logFile=logFile, disableCurses=disableCurses) + SystemController.__init__(self, inputChannel, outputChannel=outputChannel, statusChannel=statusChannel, controlChannel=controlChannel, idFormatSpec=idFormatSpec, processorFile=processorFile, processorClass=processorClass, processorArgs=processorArgs, objectIdField=objectIdField, objectIdOffset=objectIdOffset, fieldRequest=fieldRequest, skipInitialUpdates=skipInitialUpdates, reportStatsList=reportStatsList, logLevel=logLevel, logFile=logFile, disableCurses=disableCurses) self.consumerId = consumerId self.nConsumers = nConsumers if consumerIdList: @@ -103,7 +103,7 @@ def createDataProcessorConfig(self, consumerId): self.logger.debug(f'Processor output channel name: {self.outputChannel}') # Create config dict - return HpcController.createDataProcessorConfig(self, consumerId) + return SystemController.createDataProcessorConfig(self, consumerId) def getStatusTypeDict(self): statusTypeDict = DataConsumer.STATUS_TYPE_DICT diff --git a/pvapy/hpc/hpcController.py b/pvapy/hpc/hpcController.py index bea703b..2855d81 100755 --- a/pvapy/hpc/hpcController.py +++ b/pvapy/hpc/hpcController.py @@ -1,14 +1,7 @@ #!/usr/bin/env python -import json import threading -import time -import pvaccess as pva from ..utility.loggingManager import LoggingManager -from ..utility.objectUtility import ObjectUtility -from ..utility.pvapyPrettyPrinter import PvaPyPrettyPrinter -from ..hpc.sourceChannel import SourceChannel -from ..hpc.dataProcessingController import DataProcessingController class HpcController: @@ -16,359 +9,33 @@ class HpcController: MIN_STATUS_UPDATE_PERIOD = 10.0 COMMAND_EXEC_DELAY = 0.1 + SUCCESS_RETURN_CODE = 0 + ERROR_RETURN_CODE = 1 + GET_STATS_COMMAND = 'get_stats' RESET_STATS_COMMAND = 'reset_stats' CONFIGURE_COMMAND = 'configure' STOP_COMMAND = 'stop' - CONTROLLER_TYPE = 'controller' - - CONTROL_TYPE_DICT = { - 'objectTime' : pva.DOUBLE, - 'objectTimestamp' : pva.PvTimeStamp(), - 'command' : pva.STRING, - 'args' : pva.STRING, - 'statusMessage' : pva.STRING - } - - @classmethod - def getControllerIdField(cls): - return f'{cls.CONTROLLER_TYPE}Id' - - @classmethod - def getControlTypeDict(cls): - d = {cls.getControllerIdField() : pva.UINT} - d.update(cls.CONTROL_TYPE_DICT) - return d + CONTROLLER_TYPE = 'hpc' - @classmethod - def generateIdList(cls, listSpec): - # List spec should be given either as range() spec - # or as comma-separated list. - idList = listSpec - if type(listSpec) == str: - if not listSpec.startswith('range') and not listSpec.startswith('['): - idList = f'[{listSpec}]' - idList = eval(idList) - return list(idList) - - def __init__(self, inputChannel, outputChannel=None, statusChannel=None, controlChannel=None, processorFile=None, processorClass=None, processorArgs=None, idFormatSpec=None, objectIdField='uniqueId', objectIdOffset=0, fieldRequest='', skipInitialUpdates=1, reportStatsList='all', logLevel=None, logFile=None, disableCurses=False): - self.lock = threading.Lock() - self.screen = None - self.inputChannel = inputChannel - self.outputChannel = outputChannel - self.statusChannel = statusChannel - self.controlChannel = controlChannel - self.idFormatSpec = idFormatSpec - self.processorFile = processorFile - self.processorClass = processorClass - self.processorArgs = processorArgs - self.objectIdField = objectIdField - self.objectIdOffset = objectIdOffset - self.fieldRequest = fieldRequest - self.skipInitialUpdates = skipInitialUpdates - self.reportStatsList = reportStatsList + def __init__(self, logLevel=None, logFile=None): self.logLevel = logLevel self.logFile = logFile - self.disableCurses = disableCurses - - self.isStopped = True - self.shouldBeStopped = False - self.isRunning = False - self.statsObjectId = 0 - self.statsEnabled = {} - for statsType in ['monitor','queue','processor','user']: - self.statsEnabled[f'{statsType}Stats'] = 'all' in reportStatsList or statsType in reportStatsList - self.prettyPrinter = PvaPyPrettyPrinter() - self.hpcObject = None - self.hpcObjectId = None - - self.logger = self.setupLogging(logLevel, logFile) - self.screen = self.setupCurses(self.disableCurses, self.logLevel) - - def setupLogging(self, logLevel, logFile): - if logLevel: - LoggingManager.setLogLevel(logLevel) - if logFile: - LoggingManager.addFileHandler(logFile) - else: - LoggingManager.addStreamHandler() - logger = LoggingManager.getLogger(self.__class__.__name__) - return logger + self.logger = LoggingManager.getLogger(self.__class__.__name__, logLevel, logFile) - def setupCurses(self, disableCurses, logLevel): - screen = None - if not disableCurses and not logLevel: - try: - import curses - screen = curses.initscr() - self.curses = curses - except ImportError as ex: - self.logger.warning(f'Disabling curses library: {ex}') - return screen - - def formatIdString(self, idValue): - if self.idFormatSpec: - return f'{idValue:{self.idFormatSpec}}' - return f'{idValue}' - - def controlCallback(self, pv): - t = time.time() - if 'command' not in pv: - statusMessage = f'Ignoring invalid request (no command specified): {pv}' - self.logger.warning(statusMessage) - self.controlPvObject.set({'statusMessage' : statusMessage, 'objectTime' : t, 'objectTimestamp' : pva.PvTimeStamp(t)}) - return - command = pv['command'] - self.logger.debug(f'Got command: {command}') - if command == self.RESET_STATS_COMMAND: - self.logger.info(f'Control channel: resetting {self.CONTROLLER_TYPE} statistics') - cTimer = threading.Timer(self.COMMAND_EXEC_DELAY, self.controlResetStats) - elif command == self.GET_STATS_COMMAND: - self.logger.info(f'Control channel: getting {self.CONTROLLER_TYPE} statistics') - cTimer = threading.Timer(self.COMMAND_EXEC_DELAY, self.controlGetStats) - elif command == self.CONFIGURE_COMMAND: - args = '' - if 'args' not in pv: - self.logger.debug('Empty keyword arguments string for the configure request') - else: - args = pv['args'] - self.logger.info(f'Control channel: configuring {self.CONTROLLER_TYPE} with args: {args}') - cTimer = threading.Timer(self.COMMAND_EXEC_DELAY, self.controlConfigure, args=[args]) - elif command == self.STOP_COMMAND: - self.logger.info(f'Control channel: stopping {self.CONTROLLER_TYPE}') - cTimer = threading.Timer(self.COMMAND_EXEC_DELAY, self.controlStop) - else: - statusMessage = f'Ignoring invalid request (unrecognized command specified): {pv}' - self.logger.warning(statusMessage) - self.controlPvObject.set({'statusMessage' : statusMessage, 'objectTime' : t, 'objectTimestamp' : pva.PvTimeStamp(t)}) - return - statusMessage = 'Command successful' - self.controlPvObject.set({'statusMessage' : statusMessage, 'objectTime' : t, 'objectTimestamp' : pva.PvTimeStamp(t)}) - cTimer.start() - - def controlConfigure(self, configDict): - self.logger.debug(f'Configuring {self.CONTROLLER_TYPE} {self.hpcObjectId} with: {configDict}') - try: - configDict = json.loads(configDict) - self.logger.debug(f'Converted configuration args string from JSON: {configDict}') - except Exception as ex: - self.logger.debug(f'Cannot convert string {configDict} from JSON: {ex}') - try: - self.hpcObject.configure(configDict) - statusMessage = 'Configuration successful' - self.logger.debug(statusMessage) - except Exception as ex: - self.stopScreen() - statusMessage = f'Configuration failed: {ex}' - self.logger.warning(statusMessage) - self.controlPvObject['statusMessage'] = statusMessage - - def controlResetStats(self): - self.logger.debug(f'Resetting stats for {self.CONTROLLER_TYPE} {self.hpcObjectId}') - self.hpcObject.resetStats() - statusMessage = 'Stats reset successful' - self.controlPvObject['statusMessage'] = statusMessage - - def controlGetStats(self): - self.logger.debug(f'Getting stats for {self.CONTROLLER_TYPE} {self.hpcObjectId}') - self.reportStats() - statusMessage = 'Stats update successful' - self.controlPvObject['statusMessage'] = statusMessage - - def controlStop(self): - self.logger.debug(f'Stopping {self.CONTROLLER_TYPE} {self.hpcObjectId}') - self.shouldBeStopped = True - statusMessage = 'Stop flag set' - self.controlPvObject['statusMessage'] = statusMessage - - def getStatusTypeDict(self): - return {} - - def createOutputChannels(self, hpcObjectId): - self.pvaServer = None - hpcObjectIdString = self.formatIdString(hpcObjectId) - if self.statusChannel or self.controlChannel or self.outputChannel: - self.pvaServer = pva.PvaServer() - if self.statusChannel == '_': - self.statusChannel = f'pvapy:{self.CONTROLLER_TYPE}:{hpcObjectIdString}:status' - if self.statusChannel: - self.statusChannel = self.statusChannel.replace('*', hpcObjectIdString) - self.logger.debug(f'Status channel name: {self.statusChannel}') - self.statusTypeDict = self.getStatusTypeDict() - if self.statusChannel: - statusPvObject = pva.PvObject(self.statusTypeDict, {f'{self.getControllerIdField()}' : hpcObjectId}) - self.pvaServer.addRecord(self.statusChannel, statusPvObject, None) - self.logger.debug(f'Created {self.CONTROLLER_TYPE} status channel: {self.statusChannel}') - - if self.controlChannel == '_': - self.controlChannel = f'pvapy:{self.CONTROLLER_TYPE}:{hpcObjectIdString}:control' - if self.controlChannel: - self.controlChannel = self.controlChannel.replace('*', hpcObjectIdString) - self.logger.debug(f'Control channel name: {self.controlChannel}') - if self.controlChannel: - # Keep reference to the control object so we can - # update it - self.controlPvObject = pva.PvObject(self.getControlTypeDict(), {f'{self.getControllerIdField()}' : hpcObjectId}) - self.pvaServer.addRecord(self.controlChannel, self.controlPvObject, self.controlCallback) - self.logger.debug(f'Created {self.CONTROLLER_TYPE} control channel: {self.controlChannel}') - - def createDataProcessorConfig(self, processorId): - processorConfig = {} - if self.processorArgs: - processorConfig = json.loads(self.processorArgs) - processorConfig['processorId'] = processorId - processorConfig['inputChannel'] = self.inputChannel - processorConfig['outputChannel'] = self.outputChannel - processorConfig['objectIdField'] = self.objectIdField - processorConfig['skipInitialUpdates'] = self.skipInitialUpdates - processorConfig['objectIdOffset'] = self.objectIdOffset - processorConfig['fieldRequest'] = self.fieldRequest - return processorConfig - - def createDataProcessor(self, processorId): - self.processorConfig = self.createDataProcessorConfig(processorId) - self.logger.debug(f'Using processor configuration: {self.processorConfig}') - userDataProcessor = None - if self.processorFile and self.processorClass: - userDataProcessor = ObjectUtility.createObjectInstanceFromFile(self.processorFile, 'userDataProcessorModule', self.processorClass, self.processorConfig) - elif self.processorClass: - userDataProcessor = ObjectUtility.createObjectInstanceFromClassPath(self.processorClass, self.processorConfig) - - if userDataProcessor is not None: - self.logger.debug(f'Created data processor {processorId}: {userDataProcessor}') - userDataProcessor.processorId = processorId - userDataProcessor.objectIdField = self.processorConfig['objectIdField'] - self.processingController = DataProcessingController(self.processorConfig, userDataProcessor) - return self.processingController - - def getStatusTypeDict(self, processingController): - return {} + def configure(self, configDict): + pass def start(self): - self.lock.acquire() - try: - if not self.isStopped: - self.logger.warn(f'Controller for hpc {self.CONTROLLER_TYPE} {self.hpcObjectId} is already started') - return - self.isStopped = False - self.shouldBeStopped = False - self.logger.debug(f'Controller for hpc {self.CONTROLLER_TYPE} {self.hpcObjectId} is starting') - - try: - self.logger.info(f'Starting hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}') - self.hpcObject.start() - self.logger.info(f'Started hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}') - except Exception as ex: - self.logger.warn(f'Could not start hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}: {ex}') - raise - - if self.pvaServer: - self.pvaServer.start() - finally: - self.lock.release() + pass def reportStats(self, statsDict=None): - if not statsDict: - statsDict = self.getStats() - statsDict[f'{self.getControllerIdField()}'] = self.hpcObjectId - report = self.prettyPrinter.pformat(statsDict) - - if self.screen: - try: - self.screen.erase() - self.screen.addstr(report) - self.screen.refresh() - return - except Exception as ex: - # Turn screen off on errors - self.stopScreen() - print(report) + pass def getStats(self): return {} - def processPvUpdate(self, updateWaitTime): - return False - - def stopScreen(self): - if self.screen: - self.curses.endwin() - self.screen = None - def stop(self): - self.lock.acquire() - try: - if self.isStopped: - self.logger.warn(f'Controller for hpc {self.CONTROLLER_TYPE} {self.hpcObjectId} is already stopped') - return - if self.isRunning: - # Stop running thread - self.shouldBeStopped = True - self.isStopped = True - self.logger.debug(f'Controller for hpc {self.CONTROLLER_TYPE} {self.hpcObjectId} is stopping') - try: - self.logger.info(f'Stopping hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}') - self.hpcObject.stop() - except Exception as ex: - self.logger.warn(f'Could not stop hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}') - statsDict = self.hpcObject.getStats() - self.stopScreen() - return statsDict - finally: - self.lock.release() - - def run(self, runtime=0, reportPeriod=0): - self.lock.acquire() - try: - if self.isRunning: - self.logger.warn(f'Controller for {self.CONTROLLER_TYPE} {self.hpcObjectId} is already running') - return - self.isRunning = True - self.shouldBeStopped = False - finally: - self.lock.release() - self.start() - startTime = time.time() - lastReportTime = startTime - lastStatusUpdateTime = startTime - waitTime = self.WAIT_TIME - minStatusUpdatePeriod = self.MIN_STATUS_UPDATE_PERIOD - while True: - try: - now = time.time() - wakeTime = now+waitTime - if self.shouldBeStopped: - break - if runtime > 0: - rt = now - startTime - if rt > runtime: - break - if reportPeriod > 0 and now-lastReportTime > reportPeriod: - lastReportTime = now - lastStatusUpdateTime = now - self.reportStats() - - if now-lastStatusUpdateTime > minStatusUpdatePeriod: - lastStatusUpdateTime = now - self.getStats() - - try: - hasProcessedObject = self.processPvUpdate(waitTime) - if not hasProcessedObject: - # Check if we need to sleep - delay = wakeTime-time.time() - if delay > 0: - time.sleep(delay) - except Exception as ex: - self.stopScreen() - self.logger.error(f'Processing error: {ex}') - - except KeyboardInterrupt as ex: - break + pass - statsDict = self.stop() - # Allow clients monitoring various channels to get last update - time.sleep(waitTime) - self.reportStats(statsDict) - self.isRunning = False diff --git a/pvapy/hpc/mpDataConsumerController.py b/pvapy/hpc/mpDataConsumerController.py index 870ed94..f44bc88 100755 --- a/pvapy/hpc/mpDataConsumerController.py +++ b/pvapy/hpc/mpDataConsumerController.py @@ -7,10 +7,10 @@ import multiprocessing as mp from ..utility.loggingManager import LoggingManager from .dataConsumer import DataConsumer -from .hpcController import HpcController +from .systemController import SystemController from .dataConsumerController import DataConsumerController -class MpDataConsumerController(HpcController): +class MpDataConsumerController(SystemController): ''' Controller class for a multiple data consumers. @@ -51,7 +51,7 @@ class MpDataConsumerController(HpcController): ''' def __init__(self, inputChannel, outputChannel=None, statusChannel=None, controlChannel=None, idFormatSpec=None, processorFile=None, processorClass=None, processorArgs=None, objectIdField='uniqueId', objectIdOffset=0, fieldRequest='', skipInitialUpdates=1, reportStatsList='all', logLevel=None, logFile=None, disableCurses=False, consumerId=1, nConsumers=1, consumerIdList=None, inputProviderType='pva', serverQueueSize=0, monitorQueueSize=-1, accumulateObjects=-1, accumulationTimeout=1, distributorPluginName='pydistributor', distributorGroup=None, distributorSet=None, distributorTrigger=None, distributorUpdates=None, nDistributorSets=1, metadataChannels=None): - HpcController.__init__(self, inputChannel, outputChannel=outputChannel, statusChannel=statusChannel, controlChannel=controlChannel, idFormatSpec=idFormatSpec, processorFile=processorFile, processorClass=processorClass, processorArgs=processorArgs, objectIdField=objectIdField, objectIdOffset=objectIdOffset, fieldRequest=fieldRequest, skipInitialUpdates=skipInitialUpdates, reportStatsList=reportStatsList, logLevel=logLevel, logFile=logFile, disableCurses=disableCurses) + SystemController.__init__(self, inputChannel, outputChannel=outputChannel, statusChannel=statusChannel, controlChannel=controlChannel, idFormatSpec=idFormatSpec, processorFile=processorFile, processorClass=processorClass, processorArgs=processorArgs, objectIdField=objectIdField, objectIdOffset=objectIdOffset, fieldRequest=fieldRequest, skipInitialUpdates=skipInitialUpdates, reportStatsList=reportStatsList, logLevel=logLevel, logFile=logFile, disableCurses=disableCurses) self.consumerId = consumerId # used as a start of the consumer id range self.nConsumers = nConsumers self.consumerIdListSpec = consumerIdList diff --git a/pvapy/hpc/systemController.py b/pvapy/hpc/systemController.py new file mode 100755 index 0000000..e54b658 --- /dev/null +++ b/pvapy/hpc/systemController.py @@ -0,0 +1,354 @@ +#!/usr/bin/env python + +import json +import threading +import time +import pvaccess as pva +from ..utility.loggingManager import LoggingManager +from ..utility.objectUtility import ObjectUtility +from ..utility.pvapyPrettyPrinter import PvaPyPrettyPrinter +from .sourceChannel import SourceChannel +from .dataProcessingController import DataProcessingController +from .hpcController import HpcController + +class SystemController(HpcController): + + CONTROLLER_TYPE = 'system' + + CONTROL_TYPE_DICT = { + 'objectTime' : pva.DOUBLE, + 'objectTimestamp' : pva.PvTimeStamp(), + 'command' : pva.STRING, + 'args' : pva.STRING, + 'statusMessage' : pva.STRING + } + + @classmethod + def getControllerIdField(cls): + return f'{cls.CONTROLLER_TYPE}Id' + + @classmethod + def getControlTypeDict(cls): + d = {cls.getControllerIdField() : pva.UINT} + d.update(cls.CONTROL_TYPE_DICT) + return d + + @classmethod + def generateIdList(cls, listSpec): + # List spec should be given either as range() spec + # or as comma-separated list. + idList = listSpec + if type(listSpec) == str: + if not listSpec.startswith('range') and not listSpec.startswith('['): + idList = f'[{listSpec}]' + idList = eval(idList) + return list(idList) + + def __init__(self, inputChannel, outputChannel=None, statusChannel=None, controlChannel=None, processorFile=None, processorClass=None, processorArgs=None, idFormatSpec=None, objectIdField='uniqueId', objectIdOffset=0, fieldRequest='', skipInitialUpdates=1, reportStatsList='all', logLevel=None, logFile=None, disableCurses=False): + HpcController.__init__(self, logLevel=logLevel, logFile=logFile) + self.lock = threading.Lock() + self.screen = None + self.inputChannel = inputChannel + self.outputChannel = outputChannel + self.statusChannel = statusChannel + self.controlChannel = controlChannel + self.idFormatSpec = idFormatSpec + self.processorFile = processorFile + self.processorClass = processorClass + self.processorArgs = processorArgs + self.objectIdField = objectIdField + self.objectIdOffset = objectIdOffset + self.fieldRequest = fieldRequest + self.skipInitialUpdates = skipInitialUpdates + self.reportStatsList = reportStatsList + self.disableCurses = disableCurses + + self.isStopped = True + self.shouldBeStopped = False + self.isRunning = False + self.statsObjectId = 0 + self.statsEnabled = {} + for statsType in ['monitor','queue','processor','user']: + self.statsEnabled[f'{statsType}Stats'] = 'all' in reportStatsList or statsType in reportStatsList + self.prettyPrinter = PvaPyPrettyPrinter() + self.hpcObject = None + self.hpcObjectId = None + + self.screen = self.setupCurses(self.disableCurses, self.logLevel) + + def setupCurses(self, disableCurses, logLevel): + screen = None + if not disableCurses and not logLevel: + try: + import curses + screen = curses.initscr() + self.curses = curses + except ImportError as ex: + self.logger.warning(f'Disabling curses library: {ex}') + return screen + + def formatIdString(self, idValue): + if self.idFormatSpec: + return f'{idValue:{self.idFormatSpec}}' + return f'{idValue}' + + def controlCallback(self, pv): + t = time.time() + if 'command' not in pv: + statusMessage = f'Ignoring invalid request (no command specified): {pv}' + self.logger.warning(statusMessage) + self.controlPvObject.set({'statusMessage' : statusMessage, 'objectTime' : t, 'objectTimestamp' : pva.PvTimeStamp(t)}) + return + command = pv['command'] + self.logger.debug(f'Got command: {command}') + if command == self.RESET_STATS_COMMAND: + self.logger.info(f'Control channel: resetting {self.CONTROLLER_TYPE} statistics') + cTimer = threading.Timer(self.COMMAND_EXEC_DELAY, self.controlResetStats) + elif command == self.GET_STATS_COMMAND: + self.logger.info(f'Control channel: getting {self.CONTROLLER_TYPE} statistics') + cTimer = threading.Timer(self.COMMAND_EXEC_DELAY, self.controlGetStats) + elif command == self.CONFIGURE_COMMAND: + args = '' + if 'args' not in pv: + self.logger.debug('Empty keyword arguments string for the configure request') + else: + args = pv['args'] + self.logger.info(f'Control channel: configuring {self.CONTROLLER_TYPE} with args: {args}') + cTimer = threading.Timer(self.COMMAND_EXEC_DELAY, self.controlConfigure, args=[args]) + elif command == self.STOP_COMMAND: + self.logger.info(f'Control channel: stopping {self.CONTROLLER_TYPE}') + cTimer = threading.Timer(self.COMMAND_EXEC_DELAY, self.controlStop) + else: + statusMessage = f'Ignoring invalid request (unrecognized command specified): {pv}' + self.logger.warning(statusMessage) + self.controlPvObject.set({'statusMessage' : statusMessage, 'objectTime' : t, 'objectTimestamp' : pva.PvTimeStamp(t)}) + return + statusMessage = 'Command successful' + self.controlPvObject.set({'statusMessage' : statusMessage, 'objectTime' : t, 'objectTimestamp' : pva.PvTimeStamp(t)}) + cTimer.start() + + def controlConfigure(self, configDict): + self.logger.debug(f'Configuring {self.CONTROLLER_TYPE} {self.hpcObjectId} with: {configDict}') + try: + configDict = json.loads(configDict) + self.logger.debug(f'Converted configuration args string from JSON: {configDict}') + except Exception as ex: + self.logger.debug(f'Cannot convert string {configDict} from JSON: {ex}') + try: + self.hpcObject.configure(configDict) + statusMessage = 'Configuration successful' + self.logger.debug(statusMessage) + except Exception as ex: + self.stopScreen() + statusMessage = f'Configuration failed: {ex}' + self.logger.warning(statusMessage) + self.controlPvObject['statusMessage'] = statusMessage + + def controlResetStats(self): + self.logger.debug(f'Resetting stats for {self.CONTROLLER_TYPE} {self.hpcObjectId}') + self.hpcObject.resetStats() + statusMessage = 'Stats reset successful' + self.controlPvObject['statusMessage'] = statusMessage + + def controlGetStats(self): + self.logger.debug(f'Getting stats for {self.CONTROLLER_TYPE} {self.hpcObjectId}') + self.reportStats() + statusMessage = 'Stats update successful' + self.controlPvObject['statusMessage'] = statusMessage + + def controlStop(self): + self.logger.debug(f'Stopping {self.CONTROLLER_TYPE} {self.hpcObjectId}') + self.shouldBeStopped = True + statusMessage = 'Stop flag set' + self.controlPvObject['statusMessage'] = statusMessage + + def getStatusTypeDict(self): + return {} + + def createOutputChannels(self, hpcObjectId): + self.pvaServer = None + hpcObjectIdString = self.formatIdString(hpcObjectId) + if self.statusChannel or self.controlChannel or self.outputChannel: + self.pvaServer = pva.PvaServer() + if self.statusChannel == '_': + self.statusChannel = f'pvapy:{self.CONTROLLER_TYPE}:{hpcObjectIdString}:status' + if self.statusChannel: + self.statusChannel = self.statusChannel.replace('*', hpcObjectIdString) + self.logger.debug(f'Status channel name: {self.statusChannel}') + self.statusTypeDict = self.getStatusTypeDict() + if self.statusChannel: + statusPvObject = pva.PvObject(self.statusTypeDict, {f'{self.getControllerIdField()}' : hpcObjectId}) + self.pvaServer.addRecord(self.statusChannel, statusPvObject, None) + self.logger.debug(f'Created {self.CONTROLLER_TYPE} status channel: {self.statusChannel}') + + if self.controlChannel == '_': + self.controlChannel = f'pvapy:{self.CONTROLLER_TYPE}:{hpcObjectIdString}:control' + if self.controlChannel: + self.controlChannel = self.controlChannel.replace('*', hpcObjectIdString) + self.logger.debug(f'Control channel name: {self.controlChannel}') + if self.controlChannel: + # Keep reference to the control object so we can + # update it + self.controlPvObject = pva.PvObject(self.getControlTypeDict(), {f'{self.getControllerIdField()}' : hpcObjectId}) + self.pvaServer.addRecord(self.controlChannel, self.controlPvObject, self.controlCallback) + self.logger.debug(f'Created {self.CONTROLLER_TYPE} control channel: {self.controlChannel}') + + def createDataProcessorConfig(self, processorId): + processorConfig = {} + if self.processorArgs: + processorConfig = json.loads(self.processorArgs) + processorConfig['processorId'] = processorId + processorConfig['inputChannel'] = self.inputChannel + processorConfig['outputChannel'] = self.outputChannel + processorConfig['objectIdField'] = self.objectIdField + processorConfig['skipInitialUpdates'] = self.skipInitialUpdates + processorConfig['objectIdOffset'] = self.objectIdOffset + processorConfig['fieldRequest'] = self.fieldRequest + return processorConfig + + def createDataProcessor(self, processorId): + self.processorConfig = self.createDataProcessorConfig(processorId) + self.logger.debug(f'Using processor configuration: {self.processorConfig}') + userDataProcessor = None + if self.processorFile and self.processorClass: + userDataProcessor = ObjectUtility.createObjectInstanceFromFile(self.processorFile, 'userDataProcessorModule', self.processorClass, self.processorConfig) + elif self.processorClass: + userDataProcessor = ObjectUtility.createObjectInstanceFromClassPath(self.processorClass, self.processorConfig) + + if userDataProcessor is not None: + self.logger.debug(f'Created data processor {processorId}: {userDataProcessor}') + userDataProcessor.processorId = processorId + userDataProcessor.objectIdField = self.processorConfig['objectIdField'] + self.processingController = DataProcessingController(self.processorConfig, userDataProcessor) + return self.processingController + + def getStatusTypeDict(self, processingController): + return {} + + def start(self): + self.lock.acquire() + try: + if not self.isStopped: + self.logger.warn(f'Controller for hpc {self.CONTROLLER_TYPE} {self.hpcObjectId} is already started') + return + self.isStopped = False + self.shouldBeStopped = False + self.logger.debug(f'Controller for hpc {self.CONTROLLER_TYPE} {self.hpcObjectId} is starting') + + try: + self.logger.info(f'Starting hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}') + self.hpcObject.start() + self.logger.info(f'Started hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}') + except Exception as ex: + self.logger.warn(f'Could not start hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}: {ex}') + raise + + if self.pvaServer: + self.pvaServer.start() + finally: + self.lock.release() + + def reportStats(self, statsDict=None): + if not statsDict: + statsDict = self.getStats() + statsDict[f'{self.getControllerIdField()}'] = self.hpcObjectId + report = self.prettyPrinter.pformat(statsDict) + + if self.screen: + try: + self.screen.erase() + self.screen.addstr(report) + self.screen.refresh() + return + except Exception as ex: + # Turn screen off on errors + self.stopScreen() + print(report) + + def getStats(self): + return {} + + def processPvUpdate(self, updateWaitTime): + return False + + def stopScreen(self): + if self.screen: + self.curses.endwin() + self.screen = None + + def stop(self): + self.lock.acquire() + try: + if self.isStopped: + self.logger.warn(f'Controller for hpc {self.CONTROLLER_TYPE} {self.hpcObjectId} is already stopped') + return + if self.isRunning: + # Stop running thread + self.shouldBeStopped = True + self.isStopped = True + self.logger.debug(f'Controller for hpc {self.CONTROLLER_TYPE} {self.hpcObjectId} is stopping') + try: + self.logger.info(f'Stopping hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}') + self.hpcObject.stop() + except Exception as ex: + self.logger.warn(f'Could not stop hpc {self.CONTROLLER_TYPE} {self.hpcObjectId}') + statsDict = self.hpcObject.getStats() + self.stopScreen() + return statsDict + finally: + self.lock.release() + + def run(self, runtime=0, reportPeriod=0): + self.lock.acquire() + try: + if self.isRunning: + self.logger.warn(f'Controller for {self.CONTROLLER_TYPE} {self.hpcObjectId} is already running') + return + self.isRunning = True + self.shouldBeStopped = False + finally: + self.lock.release() + self.start() + startTime = time.time() + lastReportTime = startTime + lastStatusUpdateTime = startTime + waitTime = self.WAIT_TIME + minStatusUpdatePeriod = self.MIN_STATUS_UPDATE_PERIOD + while True: + try: + now = time.time() + wakeTime = now+waitTime + if self.shouldBeStopped: + break + if runtime > 0: + rt = now - startTime + if rt > runtime: + break + if reportPeriod > 0 and now-lastReportTime > reportPeriod: + lastReportTime = now + lastStatusUpdateTime = now + self.reportStats() + + if now-lastStatusUpdateTime > minStatusUpdatePeriod: + lastStatusUpdateTime = now + self.getStats() + + try: + hasProcessedObject = self.processPvUpdate(waitTime) + if not hasProcessedObject: + # Check if we need to sleep + delay = wakeTime-time.time() + if delay > 0: + time.sleep(delay) + except Exception as ex: + self.stopScreen() + self.logger.error(f'Processing error: {ex}') + + except KeyboardInterrupt as ex: + break + + statsDict = self.stop() + # Allow clients monitoring various channels to get last update + time.sleep(waitTime) + self.reportStats(statsDict) + self.isRunning = False diff --git a/pvapy/hpc/userDataProcessor.py b/pvapy/hpc/userDataProcessor.py index fb0e0ed..9f5bd18 100644 --- a/pvapy/hpc/userDataProcessor.py +++ b/pvapy/hpc/userDataProcessor.py @@ -11,6 +11,7 @@ class UserDataProcessor: The following variables will be set after processor instance is created and before processing starts:\n \t\- *logger* (logging.Logger) : logger object\n \t\- *processorId* (int) : processor id\n + \t\- *inputChannel* (str) : input channel\n \t\- *outputChannel* (str) : output channel\n \t\- *objectIdField* (str) : name of the object id field\n \t\- *pvaServer* (PvaServer) : PVA Server instance for publishing output objects\n diff --git a/pvapy/hpc/userMpDataProcessor.py b/pvapy/hpc/userMpDataProcessor.py new file mode 100644 index 0000000..c5811b6 --- /dev/null +++ b/pvapy/hpc/userMpDataProcessor.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python + +from ..utility.loggingManager import LoggingManager + +class UserMpDataProcessor: + ''' + Class that serves as a base for any user class that will be processing + data from a multiprocessing queue. + The following variables will be set after processor instance is created and before processing starts:\n + \t\- *logger* (logging.Logger) : logger object\n + + **UserMpDataProcessor(processorId=1)** + + :Parameter: *processorId* (int) - processor id + ''' + + def __init__(self, processorId=1): + ''' + Constructor. + ''' + self.logger = LoggingManager.getLogger(self.__class__.__name__) + self.processorId = processorId + + def start(self): + ''' + Method invoked at processing startup. + ''' + pass + + def configure(self, configDict): + ''' + Method invoked at user initiated runtime configuration changes. + + :Parameter: *configDict* (dict) - dictionary containing configuration parameters + ''' + pass + + def process(self, mpqObject): + ''' + Data processing method. + + :Parameter: *mpqObject* (object) - object received from multiprocessing queue + ''' + self.logger.debug(f'Processor {self.processorId} processing object {mpqObject}') + + def stop(self): + ''' + Method invoked at processing shutdown. + ''' + pass + + def resetStats(self): + ''' + Method invoked at user initiated application statistics reset. + ''' + pass + + def getStats(self): + ''' + Method invoked periodically for generating application statistics. + + :Returns: Dictionary containing application statistics parameters + ''' + return {} + diff --git a/pvapy/hpc/userMpWorker.py b/pvapy/hpc/userMpWorker.py new file mode 100755 index 0000000..a3abbf3 --- /dev/null +++ b/pvapy/hpc/userMpWorker.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python + +import threading +import queue +import os +import multiprocessing as mp +from ..utility.loggingManager import LoggingManager +from .hpcController import HpcController + +class UserMpWorker(mp.Process): + + ''' + User multiprocessing worker class. + + **UserMpWorker(workerId, userMpDataProcessor, commandRequestQueue, commandResponseQueue, inputDataQueue, logLevel=None, logFile=None)** + + :Parameter: *workerId* (str) - Worker id. + :Parameter: *userMpDataProceessor* (UserMpDataProcessor) - Instance of the UserMpDataProcessor class that will be processing data. + :Parameter: *commandRequestQueue* (multiprocessing.Queue) - Command request queue. + :Parameter: *commandResponseQueue* (multiprocessing.Queue) - Command response queue. + :Parameter: *inputDataQueue* (multiprocessing.Queue) - Input data queue. + :Parameter: *logLevel* (str) - Log level; possible values: debug, info, warning, error, critical. If not provided, there will be no log output. + :Parameter: *logFile* (str) - Log file. + ''' + def __init__(self, workerId, userMpDataProcessor, commandRequestQueue, commandResponseQueue, inputDataQueue, logLevel=None, logFile=None): + + mp.Process.__init__(self) + self.logger = LoggingManager.getLogger(f'{self.__class__.__name__}.{workerId}', logLevel, logFile) + self.workerId = workerId + self.userMpDataProcessor = userMpDataProcessor + + self.inputDataQueue = inputDataQueue + self.commandRequestQueue = commandRequestQueue + self.commandResponseQueue = commandResponseQueue + self.isStopped = True + self.rpThread = RequestProcessingThread(self) + + def start(self): + ''' + Method invoked at processing startup. + ''' + if self.isStopped: + self.isStopped = False + self.userMpDataProcessor.start() + mp.Process.start(self) + + def configure(configDict): + ''' + Method invoked at user initiated runtime configuration changes. + + :Parameter: *configDict* (dict) - dictionary containing configuration parameters + ''' + self.userMpDataProcessor.configure(configDict) + + def process(self, mpqObject): + ''' + Data processing method. + + :Parameter: *mpqObject* (object) - object received from multiprocessing queue + ''' + return self.userMpDataProcessor.process(mpqObject) + + def stop(self): + ''' + Method invoked at processing shutdown. + + :Returns: Dictionary containing application statistics parameters + ''' + if not self.isStopped: + self.logger.debug(f'Stopping worker {self.workerId}, PID: {os.getpid()}') + self.isStopped = True + self.userMpDataProcessor.stop() + try: + self.logger.debug(f'Emptying input data queue for worker {self.workerId}, queue size is {self.inputDataQueue.qsize()}') + while not self.inputDataQueue.empty(): + self.inputDataQueue.get(block=True, timeout=HpcController.WAIT_TIME) + except Exception as ex: + self.logger.warn(f'Error emptying input data queue for worker {self.workerId}: {ex}') + return self.getStats() + + def resetStats(self): + ''' + Method invoked at user initiated application statistics reset. + ''' + self.userMpDataProcessor.resetStats() + + def getStats(self): + ''' + Method invoked periodically for generating application statistics. + + :Returns: Dictionary containing application statistics parameters + ''' + return self.userMpDataProcessor.getStats() + + def run(self): + ''' + Data processing thread. It retrieves objects from the input + queue and invokes user data processor process() method. + ''' + self.logger.debug(f'Data processing thread for worker {self.workerId} starting, PID: {os.getpid()}') + self.rpThread.start() + while True: + if self.isStopped: + break + try: + inputData = self.inputDataQueue.get(block=True, timeout=HpcController.WAIT_TIME) + self.process(inputData) + except queue.Empty: + pass + except Exception as ex: + self.logger.error(f'Data processing error: {ex}') + self.logger.debug(f'Data processing thread for worker {self.workerId} is exiting') + +class RequestProcessingThread(threading.Thread): + ''' + Request processing thread for the user multiprocessing worker class. + ''' + + def __init__(self, userWorkProcess): + threading.Thread.__init__(self) + self.userWorkProcess = userWorkProcess + self.logger = LoggingManager.getLogger(f'rpThread.{self.userWorkProcess.workerId}') + + def run(self): + self.logger.debug(f'Request processing thread for worker {self.userWorkProcess.workerId} starting') + while True: + if self.userWorkProcess.isStopped: + break + + # Check for new request + try: + response = {} + returnValue = None + request = self.userWorkProcess.commandRequestQueue.get(block=True, timeout=HpcController.WAIT_TIME) + self.logger.debug(f'Received request: {request}') + command = request.get('command') + requestId = request.get('requestId') + response['requestId'] = requestId + if command == HpcController.STOP_COMMAND: + returnValue = self.userWorkProcess.stop() + elif command == HpcController.CONFIGURE_COMMAND: + configDict = request.get('configDict') + self.userWorkProcess.configure(configDict) + elif command == HpcController.RESET_STATS_COMMAND: + self.userWorkProcess.resetStats() + elif command == HpcController.GET_STATS_COMMAND: + returnValue = self.userWorkProcess.getStats() + response['returnCode'] = HpcController.SUCCESS_RETURN_CODE + if returnValue is not None: + response['returnValue'] = returnValue + except queue.Empty: + pass + except Exception as ex: + self.logger.error(f'Request processing error for worker {self.userWorkProcess.workerId}: {ex}') + response['returnCode'] = HpcController.ERROR_RETURN_CODE + response['error'] = str(ex) + try: + if len(response): + self.userWorkProcess.commandResponseQueue.put(response, block=True, timeout=HpcController.WAIT_TIME) + except Exception as ex: + self.logger.error(f'Response processing error for worker {self.userWorkProcess.workerId}: {ex}') + + self.logger.debug(f'Worker {self.userWorkProcess.workerId} is done, request processing thread is exiting') + diff --git a/pvapy/hpc/userMpWorkerController.py b/pvapy/hpc/userMpWorkerController.py new file mode 100755 index 0000000..3a6d3fb --- /dev/null +++ b/pvapy/hpc/userMpWorkerController.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python + +import os +import queue +import multiprocessing as mp +from ..utility.loggingManager import LoggingManager +from .userMpWorker import UserMpWorker +from .hpcController import HpcController + +class UserMpWorkerController(HpcController): + + CONTROLLER_TYPE = 'user' + + ''' + Controller class for user multiprocessing worker process. + + **UserMpWorkerController(workerId, userMpDataProcessor, inputDataQueue, logLevel=None, logFile=None)** + + :Parameter: *workerId* (str) - Worker id. + :Parameter: *userMpDataProcessor* (UserMpDataProcessor) - Instance of the UserMpDataProcessor class that will be processing data. + :Parameter: *inputDataQueue* (multiprocessing.Queue) - Input data queue. + :Parameter: *logLevel* (str) - Log level; possible values: debug, info, warning, error, critical. If not provided, there will be no log output. + :Parameter: *logFile* (str) - Log file. + ''' + def __init__(self, workerId, userMpDataProcessor, inputDataQueue, logLevel=None, logFile=None): + HpcController.__init__(self, logLevel, logFile) + self.workerId = workerId + self.inputDataQueue = inputDataQueue + self.commandRequestQueue = mp.Queue() + self.commandResponseQueue = mp.Queue() + self.requestId = 0 + self.uwProcess = UserMpWorker(workerId, userMpDataProcessor, self.commandRequestQueue, self.commandResponseQueue, inputDataQueue, logLevel, logFile) + self.pid = os.getpid() + self.statsDict = {} + self.isStopped = True + + class ProcessNotResponding(Exception): + def __init__(self, args): + Exception.__init__(self, args) + + @classmethod + def _renameDictKeys(cls, d, keyPrefix): + if keyPrefix: + d2 = {} + for key, value in d.items(): + d2[f'{keyPrefix}{key}'] = value + return d2 + return d + + def _invokeCommandRequest(self, command, args={}): + returnCode = None + try: + requestId = self.requestId + self.requestId += 1 + request = {'requestId' : requestId, 'command' : command} + if args is not None and type(args) == dict: + request.update(args) + self.commandRequestQueue.put(request, block=True, timeout=self.WAIT_TIME) + response = self.commandResponseQueue.get(block=True, timeout=self.WAIT_TIME) + returnCode = response.get('returnCode', self.ERROR_RETURN_CODE) + returnValue = response.get('returnValue') + requestId2 = response.get('requestId') + except queue.Empty: + pass + except Exception as ex: + self.logger.error(f'Error invoking command request {request} for worker {self.workerId}: {ex}') + if returnCode is None: + raise self.ProcessNotResponding(f'No response from worker {self.workerId}') + if returnCode != self.SUCCESS_RETURN_CODE: + error = response.get('error', '') + raise Exception(f'Error response from worker {self.workerId}: {error}') + elif requestId2 != requestId: + raise Exception(f'Invalid response from worker {self.workerId}: request id {requestId} != {requestId2}') + return returnValue + + def start(self): + ''' + Method invoked at processing startup. + ''' + # Replace interrupt handler for worker processes + # to allow clean exit + import signal + originalSigintHandler = signal.signal(signal.SIGINT, signal.SIG_IGN) + self.logger.debug(f'Starting worker {self.workerId}') + self.uwProcess.start() + self.logger.debug(f'Started user worker process: {self.uwProcess}') + signal.signal(signal.SIGINT, originalSigintHandler) + self.isStopped = False + + def configure(configDict): + ''' + Method invoked at user initiated runtime configuration changes. + + :Parameter: *configDict* (dict) - dictionary containing configuration parameters + ''' + try: + if not self.isStopped: + self._invokeCommandRequest(self.CONFIGURE_COMMAND, {'configDict' : configDict}) + except Exception as ex: + self.logger.error(f'Cannot configure worker {self.workerId}: {ex}') + + def stop(self, statsKeyPrefix=None): + ''' + Method invoked at processing shutdown. + + :Parameter: *statsKeyPrefix* (str) - optional prefix to be used for all statistics parameter keys; the prefix should start with a letter or underscore, and consist of alphanumeric and underscore characters only + :Returns: Dictionary containing application statistics parameters + ''' + self.logger.debug(f'Stopping user worker process: {self.uwProcess}') + statsDict = self.statsDict + if self.isStopped: + return statsDict + self.isStopped = True + try: + statsDict2 = self._invokeCommandRequest(self.STOP_COMMAND) + if type(statsDict2) == dict: + statsDict = statsDict2 + self.statsDict = statsDict2 + except self.ProcessNotResponding as ex: + pass + except Exception as ex: + self.logger.warn(f'Cannot stop worker {self.workerId}: {ex}') + try: + self.logger.debug(f'Waiting on child process pid {self.uwProcess.pid} (my pid: {self.pid})') + self.uwProcess.join(self.WAIT_TIME) + except: + pass + try: + self.uwProcess.kill() + except: + pass + self.logger.debug(f'User worker process {self.workerId} is done') + return self._renameDictKeys(statsDict, statsKeyPrefix) + + def resetStats(self): + ''' + Method invoked at user initiated application statistics reset. + ''' + try: + if not self.isStopped: + self._invokeCommandRequest(self.RESET_STATS_COMMAND) + except Exception as ex: + self.logger.error(f'Cannot reset stats for worker {self.workerId}: {ex}') + + def getStats(self, statsKeyPrefix=None): + ''' + Method invoked periodically for generating application statistics. + + :Parameter: *statsKeyPrefix* (str) - optional prefix to be used for all statistics parameter keys; the prefix should start with a letter or underscore, and consist of alphanumeric and underscore characters only + :Returns: Dictionary containing application statistics parameters + ''' + statsDict = self.statsDict + try: + if not self.isStopped: + statsDict2 = self._invokeCommandRequest(self.GET_STATS_COMMAND) + if type(statsDict2) == dict: + statsDict = statsDict2 + self.statsDict = statsDict2 + else: + self.logger.warn(f'Worker {self.workerId} generated invalid stats dict: {statsDict2}') + except Exception as ex: + self.logger.warn(f'Cannot get stats for worker {self.workerId}: {ex}') + return self._renameDictKeys(statsDict, statsKeyPrefix) + diff --git a/pvapy/utility/adImageUtility.py b/pvapy/utility/adImageUtility.py index 2fd3a90..c470ebb 100644 --- a/pvapy/utility/adImageUtility.py +++ b/pvapy/utility/adImageUtility.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python - import time import numpy as np import pvaccess as pva @@ -34,16 +32,16 @@ class AdImageUtility: } PVA_DATA_TYPE_MAP = { - np.dtype('uint8') : [pva.UBYTE], - np.dtype('int8') : [pva.BYTE], - np.dtype('uint16') : [pva.USHORT], - np.dtype('int16') : [pva.SHORT], - np.dtype('uint32') : [pva.UINT], - np.dtype('int32') : [pva.INT], - np.dtype('uint64') : [pva.ULONG], - np.dtype('int64') : [pva.LONG], - np.dtype('float32') : [pva.FLOAT], - np.dtype('float64') : [pva.DOUBLE] + np.dtype('uint8') : pva.UBYTE, + np.dtype('int8') : pva.BYTE, + np.dtype('uint16') : pva.USHORT, + np.dtype('int16') : pva.SHORT, + np.dtype('uint32') : pva.UINT, + np.dtype('int32') : pva.INT, + np.dtype('uint64') : pva.ULONG, + np.dtype('int64') : pva.LONG, + np.dtype('float32') : pva.FLOAT, + np.dtype('float64') : pva.DOUBLE } @classmethod @@ -131,33 +129,42 @@ def getNtNdArrayDataFieldKey(cls, image): return cls.NTNDA_DATA_FIELD_KEY_MAP.get(image.dtype) @classmethod - def generateNtNdArray2D(cls, imageId, image, extraFieldsPvObject=None): + def generateNtNdArray2D(cls, imageId, imageData, nx=None, ny=None, dtype=None, compressorName=None, extraFieldsPvObject=None): if extraFieldsPvObject is None: ntNdArray = pva.NtNdArray() else: ntNdArray = pva.NtNdArray(extraFieldsPvObject.getStructureDict()) - dataFieldKey = cls.NTNDA_DATA_FIELD_KEY_MAP.get(image.dtype) - pvaDataType = cls.PVA_DATA_TYPE_MAP.get(image.dtype) - ny, nx = image.shape - data = image.flatten() + dataFieldKey = cls.NTNDA_DATA_FIELD_KEY_MAP.get(imageData.dtype) + data = imageData.flatten() + if not compressorName: + pvaDataType = cls.PVA_DATA_TYPE_MAP.get(imageData.dtype) + ny, nx = imageData.shape + size = nx*ny*data.itemsize + ntNdArray['compressedSize'] = size + ntNdArray['uncompressedSize'] = size + else: + dtype = np.dtype(dtype) + pvaDataType = cls.PVA_DATA_TYPE_MAP.get(dtype) + codec = pva.PvCodec(compressorName, pva.PvInt(int(pvaDataType))) + ntNdArray['codec'] = codec + size = nx*ny*dtype.itemsize + ntNdArray['uncompressedSize'] = size + ntNdArray['compressedSize'] = len(data) + ntNdArray['uniqueId'] = int(imageId) dims = [pva.PvDimension(nx, 0, nx, 1, False), \ pva.PvDimension(ny, 0, ny, 1, False)] ntNdArray['dimension'] = dims - size = nx*ny*data.itemsize - ntNdArray['compressedSize'] = size - ntNdArray['uncompressedSize'] = size ts = pva.PvTimeStamp(time.time()) ntNdArray['timeStamp'] = ts ntNdArray['dataTimeStamp'] = ts ntNdArray['descriptor'] = 'Image generated by PvaPy' - pvaTypeKey = cls.NTNDA_DATA_FIELD_KEY_MAP.get(image.dtype) # Alternative way of setting data - #u = pva.PvObject({dataFieldKey : pvaDataType}, {dataFieldKey : data}) + #u = pva.PvObject({dataFieldKey : [pvaDataType]}, {dataFieldKey : data}) #ntNdArray.setUnion(u) - ntNdArray['value'] = {pvaTypeKey : data} + ntNdArray['value'] = {dataFieldKey : data} attrs = [pva.NtAttribute('ColorMode', pva.PvInt(0))] ntNdArray['attribute'] = attrs if extraFieldsPvObject is not None: @@ -186,7 +193,7 @@ def replaceNtNdArrayImage2D(cls, ntNdArray, imageId, image, extraFieldsPvObject= ntNdArray['timeStamp'] = ts ntNdArray['dataTimeStamp'] = ts - u = pva.PvObject({dataFieldKey : pvaDataType}, {dataFieldKey : data}) + u = pva.PvObject({dataFieldKey : [pvaDataType]}, {dataFieldKey : data}) ntNdArray.setUnion(u) if extraFieldsPvObject is not None: ntNdArray.set(extraFieldsPvObject) diff --git a/pvapy/utility/loggingManager.py b/pvapy/utility/loggingManager.py index 4396e06..201d12c 100644 --- a/pvapy/utility/loggingManager.py +++ b/pvapy/utility/loggingManager.py @@ -34,7 +34,13 @@ def addHandler(cls, name, handler): logger.addHandler(handler) @classmethod - def getLogger(cls, name): + def getLogger(cls, name, logLevel=None, logFile=None): + if logLevel: + cls.setLogLevel(logLevel) + if logFile: + cls.addFileHandler(logFile) + else: + cls.addStreamHandler() logger = cls.loggerMap.get(name) if logger: return logger diff --git a/src/pvaccess/PvTimeStamp.cpp b/src/pvaccess/PvTimeStamp.cpp index 3e334cc..66c1783 100644 --- a/src/pvaccess/PvTimeStamp.cpp +++ b/src/pvaccess/PvTimeStamp.cpp @@ -11,7 +11,7 @@ const char* PvTimeStamp::SecondsPastEpochFieldKey("secondsPastEpoch"); const char* PvTimeStamp::NanosecondsFieldKey("nanoseconds"); const char* PvTimeStamp::UserTagFieldKey("userTag"); -const int PvTimeStamp::UnknownUserTag(-1); +const int PvTimeStamp::UnknownUserTag(0); boost::python::dict PvTimeStamp::createStructureDict() { @@ -48,6 +48,16 @@ PvTimeStamp::PvTimeStamp(double time) : setUserTag(UnknownUserTag); } +PvTimeStamp::PvTimeStamp(double time, int userTag) : + PvObject(createStructureDict(), StructureId) +{ + long long secondsPastEpoch(time); + int nanoseconds((time-secondsPastEpoch)*NanosecondsInSecond); + setSecondsPastEpoch(secondsPastEpoch); + setNanoseconds(nanoseconds); + setUserTag(userTag); +} + PvTimeStamp::PvTimeStamp(const epicsTimeStamp& ts) : PvObject(createStructureDict(), StructureId) { diff --git a/src/pvaccess/PvTimeStamp.h b/src/pvaccess/PvTimeStamp.h index 0344233..06ce043 100644 --- a/src/pvaccess/PvTimeStamp.h +++ b/src/pvaccess/PvTimeStamp.h @@ -29,6 +29,7 @@ class PvTimeStamp : public PvObject // Instance methods PvTimeStamp(); PvTimeStamp(double time); + PvTimeStamp(double time, int userTag); PvTimeStamp(const epicsTimeStamp* ts); PvTimeStamp(const epicsTimeStamp& ts); PvTimeStamp(long long secondsPastEpoch, int nanoseconds); diff --git a/src/pvaccess/SynchronizedQueue.h b/src/pvaccess/SynchronizedQueue.h index 14d54e9..b6c5d24 100644 --- a/src/pvaccess/SynchronizedQueue.h +++ b/src/pvaccess/SynchronizedQueue.h @@ -287,6 +287,7 @@ void SynchronizedQueue::push(const T& t, double timeout) int size = std::queue::size(); if (maxLength <= 0 || size < maxLength) { pushUnsynchronized(t); + return; } // Clear pop event. itemPoppedEvent.tryWait(); diff --git a/src/pvaccess/pvaccess.PvTimeStamp.cpp b/src/pvaccess/pvaccess.PvTimeStamp.cpp index c52fd9c..b74218a 100644 --- a/src/pvaccess/pvaccess.PvTimeStamp.cpp +++ b/src/pvaccess/pvaccess.PvTimeStamp.cpp @@ -20,8 +20,8 @@ class_ >("PvTimeStamp", "**PvTimeStamp(time)**\n\n" "\t:Parameter: *time* (float) - time represented as float, including seconds and fractional seconds\n\n" "\t::\n\n" - "\t\ttimeStamp2 = PvTimeStamp(1234567890.00123)\n\n" - "**PvTimeStamp(secondsPastEpoch, nanoseconds [, userTag=-1])**\n\n" + "\t\ttimeStamp2 = PvTimeStamp(1234567890.00123 [, userTag=0])\n\n" + "**PvTimeStamp(secondsPastEpoch, nanoseconds [, userTag=0])**\n\n" "\t:Parameter: *secondsPastEpoch* (long) - seconds past epoch\n\n" "\t:Parameter: *nanoseconds* (int) - nanoseconds\n\n" "\t:Parameter: *userTag* (int) - user tag\n\n" @@ -32,6 +32,8 @@ class_ >("PvTimeStamp", .def(init()) + .def(init()) + .def(init()) .def(init())