Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncMachine transitions aren't atomic #640

Open
cybergrind opened this issue Oct 24, 2023 · 4 comments
Open

AsyncMachine transitions aren't atomic #640

cybergrind opened this issue Oct 24, 2023 · 4 comments
Assignees
Labels

Comments

@cybergrind
Copy link

Thank you for the library!

Describe the bug

It looks like several parallel tasks are able to successfully trigger the same transition more than once. So it is possible to have multiple parallel tasks to run the same transition chain in parallel.

#!/usr/bin/env python3
import asyncio
import logging
from contextlib import suppress

from transitions.extensions.asyncio import AsyncMachine


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s')
log = logging.getLogger('trans')
_global = 0


async def before_b(*args):
    pass


async def on_exc(event):
    log.exception(f'During trans: {event}')
    raise event.error


machine = AsyncMachine(
    states=['A', 'B', 'C'],
    transitions=[
        {'trigger': 'go_b', 'source': 'A', 'dest': 'B', 'before': [before_b]},
        {'trigger': 'go_c', 'source': 'B', 'dest': 'C', 'before': [before_b]},
    ],
    initial='A',
    send_event=True,
    on_exception=on_exc,
)


async def async_package():
    global _global
    with suppress(Exception):
        await machine.go_b()
        await asyncio.sleep(0.01)
        await machine.go_c()
        await asyncio.sleep(0.01)
        _global += 1


async def arun():
    log.debug('before go_b')
    coros = [async_package() for _ in range(100)]
    await asyncio.wait(coros)
    log.info(f'{_global=}')
    assert _global == 1, f'_global must be 1 vs {_global}'


def main():
    asyncio.run(arun())


if __name__ == '__main__':
    main()

Expected behavior

If one task is able to trigger transitions others shouldn't be able to trigger the same transition again.
Probably check of actual state and it's transition to the new could be in some critical section that would prevent multiple successful transition from the invalid state for transition.

@aleneum
Copy link
Member

aleneum commented May 24, 2024

Hello @cybergrind,

thank you for providing this MRE. It's a good test case for parallel execution of triggers.

The possibility that new events may cancel ongoing events is actually a feature and not a bug (see this comment and issue for more info). If you want events to be processes sequentially, you can pass queued=True to the machine constructor.

I altered you code a bit since when you pass queued=True all transitions will return True because when you trigger a transition you cannot know whether it will be successful AFTER all queued transitions have been processed. Furthermore, I added ignore_invalid_triggers=True to suppress raised exceptions. I rely on after callbacks to increase _global since those callbacks will only be executed if a transition is successful. The code below will increase _global twice: once for go_b and once for go_c. This means both transitions were successful only once which is probably what you were looking for.

#!/usr/bin/env python3
import asyncio

from transitions.extensions.asyncio import AsyncMachine

_global = 0


async def before_b(event_data):
    pass

def inc_global(event_data):
    global _global
    _global += 1


machine = AsyncMachine(
    states=['A', 'B', 'C'],
    transitions=[
        {'trigger': 'go_b', 'source': 'A', 'dest': 'B', 'before': [before_b], 'after': [inc_global]},
        {'trigger': 'go_c', 'source': 'B', 'dest': 'C', 'before': [before_b], 'after': [inc_global]},
    ],
    initial='A',
    send_event=True,
    ignore_invalid_triggers=True,
    queued=True,
)


async def async_package():
    await machine.go_b()
    await asyncio.sleep(0.01)
    await machine.go_c()

async def arun():
    coros = [async_package() for _ in range(100)]
    await asyncio.wait(coros)
    print(_global)
    assert _global == 2, f'_global must be 2 vs {_global}'


def main():
    asyncio.run(arun())


if __name__ == '__main__':
    main()

@aleneum
Copy link
Member

aleneum commented May 24, 2024

I will close this for now since this is probably not relevant for you any longer. If this issue still concerns you feel free to comment and I will reopen the issue again.

@aleneum aleneum closed this as completed May 24, 2024
@cybergrind
Copy link
Author

Usage of before and after gave us super unclean code in the end with a moderately complex FSM that was hard to debug and support for small team (it was ok when there was only one person, but failed when we added more people).

With current async design we spent probably 1 or 2 weeks to pinpoint the issue (there were a lot of thing in our code to be honest) so if you will need a reason to change the default behavior there is one more in favor 😁

We have switched to the synchronous version in our case.

Thank you for a great library

@aleneum
Copy link
Member

aleneum commented May 27, 2024

Hello @cybergrind,

thank you for taking the time to respond. I am sorry to hear that you had so much trouble with transitions.

I guess the Readme must make this behaviour more obvious and should provide ways to process events without racing conditions. There would be queued=True as one approach but also using a lock could work. If transitions might trigger other transitions, this could end up in a dead lock if a task tries to acquire the lock it already holds. I haven't tested this in depth but maybe relying on an unset context in a new task is good enough for a LockedAsyncEvent:

#!/usr/bin/env python3
import asyncio

from transitions import MachineError
from transitions.extensions.asyncio import AsyncMachine, AsyncEvent


class Model:

    def __init__(self):
        self.enter_b_counter = 0
        self.enter_c_counter = 0
        self.errors = 0

    async def on_enter_B(self):
        self.enter_b_counter += 1
        # deadlock test
        await self.to_D()

    async def on_enter_C(self):
        self.enter_c_counter += 1


async def check():
    await asyncio.sleep(0.1)
    return True


class LockedAsyncEvent(AsyncEvent):
    lock = asyncio.Lock()

    async def trigger(self, model, *args, **kwargs):
        if self.machine.current_context.get() is not None:
            return await super(LockedAsyncEvent, self).trigger(model, *args, **kwargs)
        else:
            # without the previous check this could cause deadlocks when callbacks trigger further transitions
            async with self.lock:  
                return await super(LockedAsyncEvent, self).trigger(model, *args, **kwargs)


AsyncMachine.event_cls = LockedAsyncEvent

model = Model()
machine = AsyncMachine(
    model=model,
    states=['A', 'B', 'C', 'D'],
    transitions=[
        {'trigger': 'go_b', 'source': 'A', 'dest': 'B', 'conditions': [check]},
        {'trigger': 'go_c', 'source': 'D', 'dest': 'C'},
    ],
    initial='A',
)


async def async_package():
    try:
        await model.go_b()
        await asyncio.sleep(1)
        await model.go_c()
    except MachineError:
        model.errors += 1


async def arun():
    await asyncio.gather(*[async_package() for _ in range(100)])
    assert model.enter_b_counter == 1, f"Counter B was {model.enter_b_counter}"
    assert model.enter_c_counter == 1, f"Counter C was {model.enter_c_counter}"
    assert model.errors == 99, f"Error count was {model.errors}"


asyncio.run(arun())

Furthermore, if before and after is not the right place for callbacks there is also on_enter_<state> and on_exit_<state> that do not need to be defined on the model but can be passed to state configs. Instead of states=['A'] one could write states=[{'name': 'A', 'on_enter': my_callback}]

@aleneum aleneum reopened this Jun 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants