Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modelling SQS behaviour with non-atomic message processing #101

Open
tekumara opened this issue Oct 9, 2024 · 6 comments
Open

Modelling SQS behaviour with non-atomic message processing #101

tekumara opened this issue Oct 9, 2024 · 6 comments

Comments

@tekumara
Copy link
Contributor

tekumara commented Oct 9, 2024

The following:

---
options:
    max_actions: 3
---


action Init:
    sqs_fifo = []
    db = 0

atomic action Send:
    sqs_fifo.append(record(score=5, message='Hello'))

atomic fair action ReceiveSQS:
    if sqs_fifo:
        process(sqs_fifo[0])
        oneof:
            sqs_fifo.pop(0)
            pass # Do nothing i.e, in this case, don't remove the message

fair action Increment:
    v = db
    db = v + 1


func process(message):
    v = db
    db = v + 1

Will crash with:

E1010 10:51:31.117696   73379 starlark.go:55] Error executing stmt: pop: index 0 out of range: empty list
panic: Line 18: Error executing statement: sqs_fifo.pop(0, )

If I use:

atomic func process(message):

It won't crash, because process will no longer interleave with Increment.

Is there a way to capture the reservation behaviour of SQS (ie: a message can only be received by a single consumer at a time) whilst also having a non-atomic process (because in practice, my system does not atomically perform process)?

Thank you!

@tekumara
Copy link
Contributor Author

To mimic SQS's reservation behaviour I've limited the concurrency:

action_options:
    ReceiveSQS:
        max_concurrent_actions: 1

Although in practice the concurrency in my system is higher, I don't have interactions between consumers so this accurately models the system's state I think ... but open to other ideas!

process has been left as non-atomic, so it can interleave with other actions.

@jp-fizzbee
Copy link
Collaborator

The fundamental issue is, after processing a message, sqs_fifo.pop(0) removes the first message in the queue. But, you should actually remove the processed message from the queue.

That's you'll need a message id to indicate the message that just be removed. (Sqs requires the receipthandle you got from the receive request. For simplify, you can just use the message id here)

This will ensure, even if the same message was sent to the receiver twice (probably due to the visibility timeout) you delete only that message.

--
In send,
sqs_fifo.append(record(msg_id=next_id, score=5, message='Hello'))
next_id += 1

In receive, ,
msg=sqs_fifo[0]

process and then

sqs_fifo.remove(msg)

@tekumara
Copy link
Contributor Author

tekumara commented Oct 11, 2024

I see... that would be a model that's closer to how SQS works, but I still have the same problem that if ReceiveSQS runs concurrently, and process is non-atomic, then two threads can get the same message and one of them will delete it before the other, eg:

---
options:
    max_actions: 3
---


action Init:
    sqs_fifo = []
    msg_id = 0
    db = []

atomic action Send:
    sqs_fifo.append(record(msg_id=msg_id, score=5, message='Hello'))
    msg_id +=1

atomic action ReceiveSQS:
    if sqs_fifo:
        msg = sqs_fifo[0]
        process(msg)
        oneof:
            sqs_fifo.remove(msg)
            pass # Do nothing i.e, in this case, don't remove the message

fair action Increment:
    v = db
    v.append("i")
    db = v


func process(message):
    v = db
    v.append("p")
    db = v

now errors with:

E1011 20:33:27.726531   87310 starlark.go:55] Error executing stmt: remove: element not found
panic: Line 21: Error executing statement: sqs_fifo.remove(msg, )

@jp-fizzbee
Copy link
Collaborator

jp-fizzbee commented Oct 12, 2024

Sorry, I just got back from vacation.

Actually, you might need to atomically check if the first entry is still the msg that was processed.

    oneof:

            atomic:
                if sqs_fifo and sqs_fifo[0] == msg:
                    sqs_fifo.remove(msg)
            pass # Do nothing i.e, in this case, don't remove the message

Alternatively, that's could be expressed in a single line

    oneof:
            sqs_fifo.remove(msg) if sqs_fifo and sqs_fifo[0] == msg else None
            pass # Do nothing i.e, in this case, don't remove the message


@jp-fizzbee
Copy link
Collaborator

Does this answer your question?

@tekumara
Copy link
Contributor Author

I see.. so that will avoid the error during deletion, but would that still allow two concurrent actions to receive the same message? To avoid that I guess I need max_concurrent_actions: 1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants