-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(evpn-bridge): fix system behaviour for pending objects #391
base: main
Are you sure you want to change the base?
Conversation
86a66df
to
ae178ed
Compare
Signed-off-by: Dimitrios Markou <[email protected]>
ae178ed
to
85bde59
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #391 +/- ##
===========================================
- Coverage 50.77% 24.44% -26.33%
===========================================
Files 37 44 +7
Lines 2525 5674 +3149
===========================================
+ Hits 1282 1387 +105
- Misses 1114 4120 +3006
- Partials 129 167 +38 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we could create some unit tests which could prove that this approach work and free of race conditions. Go tests can be run with -race
flag to catch any race conditions:
https://go.dev/doc/articles/race_detector
@@ -128,10 +129,20 @@ func (e *EventBus) subscriberExist(eventType string, moduleName string) bool { | |||
} | |||
|
|||
// Publish api notifies the subscribers with certain eventType | |||
func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) { | |||
func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) error { | |||
e.publishL.RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm... if write into a channel, why do we use read lock here? With the read lock many threads can publish at the same time. We apparently want to use Lock
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are right. The question though is if we actually need a Lock.
In a case where two goroutines try to write on the same channel I think golang is smart enough to block the second goroutine until the first one has finished writing. In that case we do not need any Lock. is that correct ?
WDYT ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we need the lock only to lock on the channel, then we can remove it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think the Lock is not needed here we can remove it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls remove then
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
log.Printf("processTasks(): Notification not sent to subscriber %+v with data %+v. Subscriber is busy. The Task %+v will be requeued.\n", sub, objectData, task) | ||
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task | ||
// so we do start again from the subscriber that returned an error or was unavailable for any reason. | ||
task.subIndex += i |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are at subscriber with index i
at the moment
if you use +=
here and you already have subIndex
as non-zero, won't you end up with a wrong index to start with next time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No this will not happen as the next time we will take a sublist of the subscribers based on the subIndex. Then we will iterate on that sublist and that means that the i will start from 0 again.
check here: https://github.com/opiproject/opi-evpn-bridge/blob/main/pkg/infradb/taskmanager/taskmanager.go#L114
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds a bit error-prone
What if we, instead of index calculations, copy the rest of subscribers into a task, so it could continue where it stopped?
Any other means?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest I like the solution as it is today. It looks cleaner to me. It is tested and it works so nice so I would not like to change it if that is ok with you. I can put a comment if you want so people can understand what this index calculation is all about. I do not like so much to be honest to keep sublists all the time for the remaining subscribers to me that is a bit more error prone. Maybe we can revisit this issue in the future as this bug fix here is not related to the subIndex. The SubIndex was allready there from the beggining
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me it is not. I already misinterpreted what is happening here. We could create an issue and discuss it there. I don't know if we can add a Subscriber in the middle and if we need to regard that a new one and other details
I can put a comment if you want so people can understand what this index calculation is all about.
Please do. I am also wondering if it could be a single place like a function to "re-queue task"
Maybe we can revisit this issue in the future as this bug fix here is not related to the subIndex. The SubIndex was allready there from the beggining
Please create an issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task | ||
// so we do start again from the subscriber that returned an error or was unavailable for any reason. | ||
task.subIndex += i | ||
task.systemTimer *= 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- maybe the name should be smth like
waitForRetry
? - Will we stop increasing timer if no one is listening for the published task on another end?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I would like to keep it as a system timer as it is more explicit if you compare it also with the Component.Timer which is provided by each component in case of Error :
time.AfterFunc(taskStatus.component.Timer, func() { - The timer will increase every time that we try to publish but the channel is busy and we need to reque the task. So it is working this way:
- We publish
- An error is returned because the channel is busy
- We increase the timer
- We wait for the timer to expire in a goroutine and then we requeue the task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flow you described is clear. I want to know if we ever stop increasing it. Will it make sense to wait for hours? days? years?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently we never stop increasing it. But this is a known issue that we have just not addressed do far. We are planning to address this in the future. I can open an issue to track it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pls create an issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
#394
task.systemTimer *= 2 | ||
log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer) | ||
time.AfterFunc(task.systemTimer, func() { | ||
t.taskQueue.Enqueue(task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
if we do not receive an answer within reasonable time, does it mean that we have a problem and probably no one is handling our requests? and the best we can do is report an error and gracefully shutdown?
-
If we block for 30 seconds in this thread, can we handle other task responses? If we are blocked for 30 seconds, doesn't it mean that what you actually want is to wait for a response in another goroutine which could continue handling when the response is received?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- This is not built for that. What we want here is that when we do not receive any answer because for whatever reason the subsciber is stuck we would like to requeue the task and try again because maybe the unresponsivness of the subscriber is temporary. Now if the subscriber is completely dead and cannot send a response back at all we need to put in plase some resiliency mechanisms to handle this case btu this is something to investigate and implement in the future currently we do not look into that corner case.
- We will not have multiple subscribers sending multiple responses at the same time. The system is designed this way where we send a task to a subscriber and then we just wait for that subsriber to respond before we move on to the next subscriber. You will not hve a case where we send in parallel multipe tasks to multiple subscribers and the subscribers will send in the same time multiple respnses back in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now if the subscriber is completely dead and cannot send a response back at all we need to put in plase some resiliency mechanisms to handle this case btu this is something to investigate and implement in the future currently we do not look into that corner case.
Issue is needed
The system is designed this way where we send a task to a subscriber and then we just wait for that subsriber to respond before we move on to the next subscriber.
Sounds like a sequential flow. Do we need channels then? Pls consider. It would make all the things much simpler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think channels and notifications make the whole system scale better. The implementation that we have so far it just uses one task queue and one process that drains the queue. Maybe in the future if we hit any limitation we could use more processes to drain the queue of the tasks and I think this will scale easier when we are using channels and notifications. So I would prefer keeping it this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe in the future if we hit any limitation we could use more processes to drain the queue of the tasks and I think this will scale easier when we are using channels and notifications.
But we didn't hit, and we don't know if we really need it in the future, but already complicating the design.
If we need to scale, there are likely better approaches than this sequential one.
If we don't need to scale, then we are complicating the system for nothing at the moment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I didn't get your question correct at the beggining. I apologize for this
The channels are not so much related to the sequential flow as to keep the core system as much as was possible agnostic to the plugins that we need to send and receive notifications. The plugins are responsible to configuring the underline system of their responsibility (e.g. FRRModule for FRR, LinuxModule for Linux etc...). The core system doesn't need to know what are those plugins or what they exactly do but the only thing that they need to know is towards which subscribers they need to send notifications. The channels help in this agnostic notion as well as to the communication between the different go routines as each subscriber runs as a different go routine.
Also the sequential flow it is there because the different plugins have some sort of dependency into eachother. That dependency is expressed by the sequential flow and that is why we want the first plugin to succeed before we move to the second one. Because for instance if the Linux Vendor Module wants to attach an interface into a bridge which bridge has been created before by General Linux Module if the sequential flow is not there the Linux Vendor module call will fail as it has a dependency to the General Linux module operation first to create the bridge.
This is a significant design choice which has been presented to the TSC before the implementation and we have decided that was resonable to move forward with it . Also we think that is a good design and works well so far and I do not really agree that the channels make the system complicated as they serve the architecture well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channels are not so much related to the sequential flow as to keep the core system as much as was possible agnostic to the plugins that we need to send and receive notifications.
The same level of agnosticism can be achieved with a function calling. The channels do not add more agnosticism here.
By the definition: Channels are the pipes that connect concurrent goroutines. You can send values into channels from one goroutine and receive those values into another goroutine.
as well as to the communication between the different go routines as each subscriber runs as a different go routine.
They can be run in different routines, even if we call them by a function (you can use mutex to sync or send to channel or execute right in the same thread context)
Also the sequential flow it is there because the different plugins have some sort of dependency into eachother. That dependency is expressed by the sequential flow and that is why we want the first plugin to succeed before we move to the second one.
We send a msg, and go to sleep waiting for a plugin to complete its job transforming our flow into sequential
Because for instance if the Linux Vendor Module wants to attach an interface into a bridge which bridge has been created before by General Linux Module if the sequential flow is not there the Linux Vendor module call will fail as it has a dependency to the General Linux module operation first to create the bridge.
What if we do not register General Linux Module at all so it won't receive notifications at all?
Also we think that is a good design and works well so far and I do not really agree that the channels make the system complicated as they serve the architecture well.
It might be. But from the chunk I see, it looks like you need a sequential flow and channels add complexity.
Apparently this PR is not the place to make such decisions, but pls consider
I have tested this code locally in order to check if the different corner cases that are resolved by this code here are working properly. Everything was working as it should be. The unit tests would be a very usefull tool to add in order not to test all the cases by hand every time but right now I do not have time to work on that so I will propose to open an issue so we can address into the future when I find some time if that is allright. |
Signed-off-by: Dimitrios Markou <[email protected]>
d581d3f
to
1a5cb19
Compare
if we leave it for an issue, it can stay there for a very long time, especially if no one have an incline for unit testing. Maybe someone could help you with that so we do not defer that? |
ok let me see what I can do about this |
Hello @artek-koltun I have one question regarding race detection. From what I understand from the link that you have provided I should not create any unit tests for catching race conditions but the only thing that I should do is to create some automatic github jobs that will build/run the programm with the --race parameter on (go build --race/ go run --race) is that correct or ? Can you please share your opinion on this ? |
We already run our uts with that flag However, take in mind that it is not a compile time tool. It is a runtime tool. We need to execute a chunk of code, which can have race issues. Thus, we need a dedicated unit test which covers the code with the issue. |
Hello @artek-koltun, ok so from what I understand the unit tests are independent of the --race condition. The --race condition checks for races in specific parts of the code which code the unit tests are checking. Right ? Also we have an agreeement to create unit tests no worries. We will create unit tests for the units (functions) that we are touching in the PR if that is allright |
Hej,
|
Hello @artek-koltun , What do you mean write a test that would fail with -race flag. I do not think that we have any race problems in the code today so we can catch them by unit test and fix them afterwards. What we are planning to so is that we will write unit tests for the code changes in this commit, we will run the tests with -race flag on and if in the meanwhile find any race problem then we will push a fix. I hope we are in the same page. |
I might have lost the context. Pls create ut to the behavior you are fixing. |
Signed-off-by: Dimitrios Markou <[email protected]>
5f9448b
to
747f26b
Compare
EventBus := NewEventBus() | ||
EventBus.StartSubscriber("testModule", "testEvent", 1, handler) | ||
|
||
time.Sleep(100 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to sleep here? To make sure we can Publish? But then it looks strange. Subscribed returned success, but we cannot use it properly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sleep is just for allow time to get the go routine ready.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we are not lucky, and it is ready in 1 second? Using sleeps is bad. It is not reliable, and it makes the tests slow. At this moment, the execution time of some tests is ~159sec
Use chan or other synchronization mechanisms
case "testEventUnsub": | ||
log.Printf("received event type %s", eventType) | ||
default: | ||
log.Printf("error: Unknown event type %s", eventType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we fail here if any unsupported type received?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
time.Sleep(100 * time.Millisecond) | ||
handler.Lock() | ||
if len(handler.receivedEvents) != 1 || handler.receivedEvents[0] != objectData { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be split into 2 checks, otherwise it is hard to distinguish what went wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
for _, sub := range subscribers { | ||
EventBus.Unsubscribe(sub) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will defer suit better here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defer wont help as all test have different leftovers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but in some of them? What does prevent to sue defer here?
} | ||
subs := []*eventbus.Subscriber{subscriber} | ||
|
||
TaskMan.StartTaskManager() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is better to use its own instance of a manager per test, then a global one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not addressed
|
||
time.Sleep(100 * time.Millisecond) | ||
|
||
task := TaskMan.taskQueue.Dequeue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we need to touch the internals instead of waiting when TaskMan sends it to us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we are just checking for this case if task was created or not.
|
||
TaskMan.StartTaskManager() | ||
handler := &ModulelciHandler{} | ||
wg.Add(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe you want to have this wg within StartSubscriber, so it exits only when we are listening for event?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then we have to change the whole design. which will impact all the other modules/pkgs also ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is too simple to break anything, but we don't need to create such solutions like here
+
in this code, I am not sure that it is doing something meaningful here:
when StartSubscriber
exits, it does not mean that it started listening for incoming events -> you could remove this go routine and get the equivalent code, but with increased chances due to waiting for go routines.
We need something more reliable, like StartSubscriber
returns when it is already listening. Use chan or other sync mechanisms
select { | ||
case task := <-TaskMan.taskQueue.channel: | ||
if task.name == "testTaskCompSuccess" { | ||
t.Errorf("assert failed:") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what exactly happened here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
task dequeued, processed by comp and queue become empty as it was successful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we could test it without going so deep into the implementation details? By calling processTasks
?
wg.Wait() | ||
|
||
retValMu.Lock() | ||
actualRetVal := retVal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
atomic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i believe we can keep lock as its limited to this test only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You use mutex and workgroup to find out if your var was changed. I think it is overkill. Using a chan would be enough (you pass a chan into handleTestEventbusy
, you wait in select for chan be not empty or timeout, handleTestEventbusy
sends into chan when ready)
747f26b
to
542e151
Compare
Signed-off-by: atulpatel261194 <[email protected]>
542e151
to
f3e299f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
for _, sub := range subscribers { | ||
EventBus.Unsubscribe(sub) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but in some of them? What does prevent to sue defer here?
|
||
TaskMan.StartTaskManager() | ||
handler := &ModulelciHandler{} | ||
wg.Add(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is too simple to break anything, but we don't need to create such solutions like here
+
in this code, I am not sure that it is doing something meaningful here:
when StartSubscriber
exits, it does not mean that it started listening for incoming events -> you could remove this go routine and get the equivalent code, but with increased chances due to waiting for go routines.
We need something more reliable, like StartSubscriber
returns when it is already listening. Use chan or other sync mechanisms
select { | ||
case task := <-TaskMan.taskQueue.channel: | ||
if task.name == "testTaskCompSuccess" { | ||
t.Errorf("assert failed:") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we could test it without going so deep into the implementation details? By calling processTasks
?
wg.Wait() | ||
|
||
retValMu.Lock() | ||
actualRetVal := retVal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You use mutex and workgroup to find out if your var was changed. I think it is overkill. Using a chan would be enough (you pass a chan into handleTestEventbusy
, you wait in select for chan be not empty or timeout, handleTestEventbusy
sends into chan when ready)
} | ||
subs := []*eventbus.Subscriber{subscriber} | ||
|
||
TaskMan.StartTaskManager() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not addressed
defer h.Unlock() | ||
h.receivedEvents = append(h.receivedEvents, objectData) | ||
switch eventType { | ||
case "testEvent": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line 101
func (h *moduleCiHandler) HandleEvent(eventType string, objectData *eventbus.ObjectData) {
h.receivedEvents = append(h.receivedEvents, objectData)
switch eventType {
case "testEvent":
handleTestEvent(objectData)
case "testEventCompSuccess":
handleTestEventCompSuccess(objectData)
case "testEventError":
handleTestEventError(objectData)
case "testEventTimeout":
handletestEventTimeout()
case "testEventNotificationIdNotMatching":
handletestNotificationIDNotMatching(objectData)
default:
log.Printf("LCI: error: Unknown event type %s", eventType)
}
}
t.Errorf("Publish() failed with error: %v", err) | ||
} | ||
|
||
time.Sleep(100 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not understand...
HandleEvent
is part of a test, and we are free to organize it as we need
The current execution time of the test is not appropriate: 159.629s and I think we could do better here instead of using sleeps
sync.Mutex | ||
} | ||
|
||
func (h *ModulelciHandler) HandleEvent(eventType string, objectData *ObjectData) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check naming for other variables/functions. I saw some upper case function variable names
EventBus := NewEventBus() | ||
EventBus.StartSubscriber("testModule", "testEvent", 1, handler) | ||
|
||
time.Sleep(100 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we are not lucky, and it is ready in 1 second? Using sleeps is bad. It is not reliable, and it makes the tests slow. At this moment, the execution time of some tests is ~159sec
Use chan or other synchronization mechanisms
This is a fix related to the issue #388