Skip to content

Commit

Permalink
Merge pull request #175 from DiamondLightSource/fix_on_update_processing
Browse files Browse the repository at this point in the history
Ensure records do not get stuck in processing state
  • Loading branch information
AlexanderWells-diamond authored Sep 19, 2024
2 parents 5ce9345 + 7c886b7 commit 35bb833
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Changed:

- `AsyncioDispatcher cleanup tasks atexit <../../pull/138>`_
- `Ensure returned numpy arrays are not writeable <../../pull/164>`_
- `Ensure records do not get stuck in processing state <../../pull/175>`_

Fixed:

Expand Down
5 changes: 3 additions & 2 deletions softioc/asyncio_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ async def async_wrapper():
ret = func(*func_args)
if inspect.isawaitable(ret):
await ret
if completion:
completion(*completion_args)
except Exception:
logging.exception("Exception when running dispatched callback")
finally:
if completion:
completion(*completion_args)
asyncio.run_coroutine_threadsafe(async_wrapper(), self.loop)

def __enter__(self):
Expand Down
16 changes: 13 additions & 3 deletions softioc/cothread_dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import inspect
import logging

class CothreadDispatcher:
def __init__(self, dispatcher = None):
Expand Down Expand Up @@ -28,7 +30,15 @@ def __call__(
completion = None,
completion_args=()):
def wrapper():
func(*func_args)
if completion:
completion(*completion_args)
try:
func(*func_args)
except Exception:
logging.exception("Exception when running dispatched callback")
finally:
if completion:
completion(*completion_args)

assert not inspect.iscoroutinefunction(func)
assert not inspect.iscoroutinefunction(completion)

self.__dispatcher(wrapper)
130 changes: 130 additions & 0 deletions tests/test_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,136 @@ async def query_record(index):
if process.exitcode is None:
pytest.fail("Process did not terminate")


def blocking_test_func_broken_on_update(
self, device_name, conn, use_asyncio
):

builder.SetDeviceName(device_name)

count_rec = builder.longIn("BLOCKING-COUNTER", initial_value=0)

async def async_blocking_broken_on_update(new_val):
"""on_update function that always throws an exception"""
log("CHILD: blocking_broken_on_update starting")
completed_count = count_rec.get() + 1
count_rec.set(completed_count)
log(
f"CHILD: blocking_update_func updated count: {completed_count}",
)
raise Exception("on_update is broken!")

def sync_blocking_broken_on_update(new_val):
"""on_update function that always throws an exception"""
log("CHILD: blocking_broken_on_update starting")
completed_count = count_rec.get() + 1
count_rec.set(completed_count)
log(
f"CHILD: blocking_update_func updated count: {completed_count}",
)
raise Exception("on_update is broken!")

if use_asyncio:
on_update_callback = async_blocking_broken_on_update
else:
on_update_callback = sync_blocking_broken_on_update

builder.longOut(
"BLOCKING-BROKEN-ON-UPDATE",
on_update=on_update_callback,
always_update=True,
blocking=True
)

if use_asyncio:
dispatcher = asyncio_dispatcher.AsyncioDispatcher()
else:
dispatcher = None
builder.LoadDatabase()
softioc.iocInit(dispatcher)

conn.send("R") # "Ready"

log("CHILD: Sent R over Connection to Parent")

# Keep process alive while main thread runs CAGET
if not use_asyncio:
log("CHILD: Beginning cothread poll_list")
import cothread
cothread.poll_list([(conn.fileno(), cothread.POLLIN)], TIMEOUT)
if conn.poll(TIMEOUT):
val = conn.recv()
assert val == "D", "Did not receive expected Done character"

log("CHILD: Received exit command, child exiting")

@requires_cothread
@pytest.mark.asyncio
@pytest.mark.parametrize("use_asyncio", [True, False])
async def test_blocking_broken_on_update(self, use_asyncio):
"""Test that a blocking record with an on_update record that will
always throw an exception will not permanently block record processing.
Runs using both cothread and asyncio dispatchers in the IOC."""
ctx = get_multiprocessing_context()

parent_conn, child_conn = ctx.Pipe()

device_name = create_random_prefix()

process = ctx.Process(
target=self.blocking_test_func_broken_on_update,
args=(device_name, child_conn, use_asyncio),
)

process.start()

log("PARENT: Child started, waiting for R command")

from aioca import caget, caput

try:
# Wait for message that IOC has started
select_and_recv(parent_conn, "R")

log("PARENT: received R command")

assert await caget(device_name + ":BLOCKING-COUNTER") == 0

log("PARENT: BLOCKING-COUNTER was 0")

await caput(
device_name + ":BLOCKING-BROKEN-ON-UPDATE",
1,
wait=True,
timeout=TIMEOUT
)

assert await caget(device_name + ":BLOCKING-COUNTER") == 1

await caput(
device_name + ":BLOCKING-BROKEN-ON-UPDATE",
2,
wait=True,
timeout=TIMEOUT
)

assert await caget(device_name + ":BLOCKING-COUNTER") == 2


finally:
# Clear the cache before stopping the IOC stops
# "channel disconnected" error messages
aioca_cleanup()

log("PARENT: Sending Done command to child")
parent_conn.send("D") # "Done"
process.join(timeout=TIMEOUT)
log(f"PARENT: Join completed with exitcode {process.exitcode}")
if process.exitcode is None:
pytest.fail("Process did not terminate")


class TestGetSetField:
"""Tests related to get_field and set_field on records"""

Expand Down

0 comments on commit 35bb833

Please sign in to comment.