diff --git a/pkg/infradb/subscriberframework/eventbus/event_bus_test.go b/pkg/infradb/subscriberframework/eventbus/event_bus_test.go new file mode 100644 index 00000000..d50418fc --- /dev/null +++ b/pkg/infradb/subscriberframework/eventbus/event_bus_test.go @@ -0,0 +1,184 @@ +package eventbus + +import ( + "log" + "sync" + "testing" + "time" +) + +type moduleCiHandler struct { + receivedEvents []*ObjectData + sync.Mutex +} + +// Constants +const ( + TestEvent = "testEvent" + TestEventpriority = "testEventpriority" + TestEventChBusy = "testEventChBusy" + TestEventUnsub = "testEventUnsub" +) + +func (h *moduleCiHandler) HandleEvent(eventType string, objectData *ObjectData) { + h.Lock() + defer h.Unlock() + h.receivedEvents = append(h.receivedEvents, objectData) + switch eventType { + case TestEvent: + case TestEventpriority: + case TestEventChBusy: + case TestEventUnsub: + log.Printf("received event type %s", eventType) + default: + log.Panicf("error: Unknown event type %s", eventType) + } +} + +func TestSubscribeAndPublish(t *testing.T) { + handler := &moduleCiHandler{} + + EventBus := NewEventBus() + EventBus.StartSubscriber("testModule", TestEvent, 1, handler) + time.Sleep(10 * time.Millisecond) + + objectData := &ObjectData{ + ResourceVersion: "v1", + Name: "testObject", + NotificationID: "123", + } + + subscribers := EventBus.GetSubscribers(TestEvent) + if len(subscribers) == 0 { + t.Errorf("No subscribers found for event type 'testEvent'") + } + subscriber := subscribers[0] + + err := EventBus.Publish(objectData, subscriber) + if err != nil { + t.Errorf("Publish() failed with error: %v", err) + } + + time.Sleep(10 * time.Millisecond) + handler.Lock() + if len(handler.receivedEvents) != 1 { + t.Errorf("Event was not received by the handler as expected") + } + if handler.receivedEvents[0] != objectData { + t.Errorf("Event data was not received by the handler as expected") + } + handler.Unlock() + + EventBus.Unsubscribe(subscriber) +} + +func TestPriorityOrderWithStartSubscriber(t *testing.T) { + handler1 := &moduleCiHandler{} + handler2 := &moduleCiHandler{} + + EventBus := NewEventBus() + + EventBus.StartSubscriber("testModule1", TestEventpriority, 2, handler1) + EventBus.StartSubscriber("testModule2", TestEventpriority, 1, handler2) + + time.Sleep(10 * time.Millisecond) + + subscribers := EventBus.GetSubscribers(TestEventpriority) + if len(subscribers) != 2 { + t.Errorf("Expected 2 subscribers, got %d", len(subscribers)) + } + if subscribers[0].Priority > subscribers[1].Priority { + t.Errorf("Subscribers are not sorted by priority") + } + + for _, sub := range subscribers { + EventBus.Unsubscribe(sub) + } +} +func TestPublishChannelBusyWithStartSubscriber(t *testing.T) { + handler := &moduleCiHandler{} + EventBus := NewEventBus() + EventBus.StartSubscriber("testModuleChBusy", TestEventChBusy, 1, handler) + + time.Sleep(10 * time.Millisecond) + + subscribers := EventBus.GetSubscribers(TestEventChBusy) + if len(subscribers) == 0 { + t.Errorf("No subscribers found for event type 'testEventChBusy'") + } + subscriber := subscribers[0] + + subscriber.Ch <- &ObjectData{} + + objectData := &ObjectData{ + ResourceVersion: "v1", + Name: "testObject", + NotificationID: "123", + } + err := EventBus.Publish(objectData, subscriber) + if err == nil { + t.Errorf("Expected an error when publishing to a busy channel, but got nil") + } + + EventBus.Unsubscribe(subscriber) +} +func TestUnsubscribeEvent(t *testing.T) { + handler := &moduleCiHandler{} + EventBus := NewEventBus() + EventBus.StartSubscriber("testModuleUnsub", TestEventUnsub, 1, handler) + + subscribers := EventBus.GetSubscribers(TestEventUnsub) + if len(subscribers) == 0 { + t.Errorf("No subscribers found for event type 'testEventUnsub'") + } + subscriber := subscribers[0] + + EventBus.UnsubscribeEvent(subscriber, TestEventUnsub) + + subscribers = EventBus.GetSubscribers(TestEventUnsub) + for _, sub := range subscribers { + if sub == subscriber { + t.Errorf("Subscriber was not successfully unsubscribed") + } + } +} + +func TestUnsubscribe(t *testing.T) { + handler := &moduleCiHandler{} + EventBus := NewEventBus() + EventBus.StartSubscriber("testModuleUnsub", TestEventUnsub, 1, handler) + + subscribers := EventBus.GetSubscribers(TestEventUnsub) + if len(subscribers) == 0 { + t.Errorf("No subscribers found for event type 'testEventUnsub'") + } + subscriber := subscribers[0] + + EventBus.Unsubscribe(subscriber) + + select { + case _, ok := <-subscriber.Ch: + if ok { + t.Errorf("Subscriber's channel should be closed, but it's not") + } + default: + } +} + +func TestSubscriberAlreadyExist(t *testing.T) { + handler := &moduleCiHandler{} + EventBus := NewEventBus() + EventBus.StartSubscriber("testModuleSubExist", "testEventSubExist", 3, handler) + + exists := EventBus.subscriberExist("testEventSubExist", "testModuleSubExist") + if !exists { + t.Errorf("subscriberExist should return true for existing subscriber") + } + + subscribers := EventBus.GetSubscribers("testEventSubExist") + for _, sub := range subscribers { + if sub.Name == "testModuleSubExist" { + EventBus.Unsubscribe(sub) + } + } +} diff --git a/pkg/infradb/subscriberframework/eventbus/eventbus.go b/pkg/infradb/subscriberframework/eventbus/eventbus.go index 72922020..21e9cd1b 100644 --- a/pkg/infradb/subscriberframework/eventbus/eventbus.go +++ b/pkg/infradb/subscriberframework/eventbus/eventbus.go @@ -52,10 +52,13 @@ func (e *EventBus) StartSubscriber(moduleName, eventType string, priority int, e for { select { case event := <-subscriber.Ch: - log.Printf("\nSubscriber %s for %s received \n", moduleName, eventType) + log.Printf("\nSubscriber %s for %s received \n", moduleName, eventType) handlerKey := moduleName + "." + eventType - if handler, ok := e.eventHandlers[handlerKey]; ok { + e.subscriberL.Lock() + handler, ok := e.eventHandlers[handlerKey] + e.subscriberL.Unlock() + if ok { if objectData, ok := event.(*ObjectData); ok { handler.HandleEvent(eventType, objectData) } else { @@ -65,6 +68,7 @@ func (e *EventBus) StartSubscriber(moduleName, eventType string, priority int, e } else { subscriber.Ch <- "error: no event handler found" } + case <-subscriber.Quit: close(subscriber.Ch) return diff --git a/pkg/infradb/taskmanager/taskmanager_test.go b/pkg/infradb/taskmanager/taskmanager_test.go new file mode 100644 index 00000000..8a2830b8 --- /dev/null +++ b/pkg/infradb/taskmanager/taskmanager_test.go @@ -0,0 +1,288 @@ +package taskmanager + +import ( + "log" + "sync" + "testing" + "time" + + "github.com/opiproject/opi-evpn-bridge/pkg/infradb/common" + "github.com/opiproject/opi-evpn-bridge/pkg/infradb/subscriberframework/eventbus" + "github.com/stretchr/testify/assert" +) + +var ( + retValMu sync.Mutex + retVal bool +) + +type moduleCiHandler struct { + receivedEvents []*eventbus.ObjectData +} + +func handleTestEvent(objectData *eventbus.ObjectData) { + name := "testTask" + objectType := "testType" + resourceVersion := "testVersion" + dropTask := false + + component := &common.Component{ + CompStatus: common.ComponentStatusSuccess, + } + + TaskMan.StatusUpdated(name, objectType, resourceVersion, objectData.NotificationID, dropTask, component) +} +func handleTestEventCompSuccess(objectData *eventbus.ObjectData) { + name := "testTaskCompSuccess" + objectType := "testTypeCompSuccess" + resourceVersion := "testVersionCompSuccess" + dropTask := false + + component := &common.Component{ + CompStatus: common.ComponentStatusSuccess, + } + + TaskMan.StatusUpdated(name, objectType, resourceVersion, objectData.NotificationID, dropTask, component) +} +func handleTestEventbusy(wg *sync.WaitGroup) { + name := "testTaskbusy" + objectType := "testTypebusy" + resourceVersion := "testVersionbusy" + dropTask := false + + component := &common.Component{ + CompStatus: common.ComponentStatusSuccess, + } + + TaskMan.StatusUpdated(name, objectType, resourceVersion, "notificationID", dropTask, component) + + retValMu.Lock() + retVal = true + retValMu.Unlock() + + wg.Done() +} +func handletestEventTimeout() { + +} +func handletestNotificationIDNotMatching(objectData *eventbus.ObjectData) { + name := "testTaskNotificationIdNotMatching" + objectType := "testTypeNotificationIdNotMatching" + resourceVersion := "testVersionNotificationIdNotMatching" + dropTask := false + + component := &common.Component{ + Name: "testModuleNotificationIdNotMatching", + CompStatus: common.ComponentStatusSuccess, + } + TaskMan.StatusUpdated(name, objectType, resourceVersion, "NotificationIdNotMatching", dropTask, component) + + time.Sleep(100 * time.Millisecond) + + TaskMan.StatusUpdated(name, objectType, resourceVersion, objectData.NotificationID, dropTask, component) +} +func handleTestEventError(objectData *eventbus.ObjectData) { + name := "testTaskEventError" + objectType := "testTypeEventError" + resourceVersion := "testVersionEventError" + + dropTask := false + + component := &common.Component{ + CompStatus: common.ComponentStatusError, + } + if component.Timer == 0 { + component.Timer = 2 * time.Second + } else { + component.Timer *= 2 + } + TaskMan.StatusUpdated(name, objectType, resourceVersion, objectData.NotificationID, dropTask, component) +} +func (h *moduleCiHandler) HandleEvent(eventType string, objectData *eventbus.ObjectData) { + h.receivedEvents = append(h.receivedEvents, objectData) + switch eventType { + case "testEvent": + handleTestEvent(objectData) + case "testEventCompSuccess": + handleTestEventCompSuccess(objectData) + case "testEventError": + handleTestEventError(objectData) + case "testEventTimeout": + handletestEventTimeout() + case "testEventNotificationIdNotMatching": + handletestNotificationIDNotMatching(objectData) + default: + log.Printf("LCI: error: Unknown event type %s", eventType) + } +} + +func TestCreateTask(t *testing.T) { + subscriber := &eventbus.Subscriber{ + Name: "testSubscriber", + Ch: make(chan interface{}), + Quit: make(chan bool), + Priority: 1, + } + subs := []*eventbus.Subscriber{subscriber} + tm := newTaskManager() + tm.StartTaskManager() + tm.CreateTask("testTask", "testType", "testVersion", subs) + + time.Sleep(100 * time.Millisecond) + + task := tm.taskQueue.Dequeue() + assert.NotNil(t, task) + assert.Equal(t, "testTask", task.name) + assert.Equal(t, "testType", task.objectType) + assert.Equal(t, "testVersion", task.resourceVersion) + assert.Equal(t, subs, task.subs) +} + +func TestCompSuccess(t *testing.T) { + var wg sync.WaitGroup + + TaskMan.StartTaskManager() + handler := &moduleCiHandler{} + wg.Add(1) + go func() { + defer wg.Done() + eventbus.EBus.StartSubscriber("testModuleCompSuccess", "testEventCompSuccess", 1, handler) + }() + + wg.Wait() + + subscribers := eventbus.EBus.GetSubscribers("testEventCompSuccess") + if len(subscribers) == 0 { + t.Fatalf("No subscribers found for event type 'testEvent'") + } + TaskMan.CreateTask("testTaskCompSuccess", "testTypeCompSuccess", "testVersionCompSuccess", subscribers) + + select { + case task := <-TaskMan.taskQueue.channel: + if task.name == "testTaskCompSuccess" { + t.Errorf("assert failed:") + } + default: + } +} + +func TestCompError(t *testing.T) { + var wg sync.WaitGroup + + TaskMan.StartTaskManager() + + handler := &moduleCiHandler{} + + wg.Add(1) + go func() { + defer wg.Done() + eventbus.EBus.StartSubscriber("testModuleEventError", "testEventError", 1, handler) + }() + + wg.Wait() + + subscribers := eventbus.EBus.GetSubscribers("testEventError") + + if len(subscribers) == 0 { + t.Fatalf("No subscribers found for event type 'testEventError'") + } + + TaskMan.CreateTask("testTaskEventError", "testTypeEventError", "testVersionEventError", subscribers) + + task := TaskMan.taskQueue.Dequeue() + + assert.NotNil(t, task, "Task should not be nil") + assert.Equal(t, "testTaskEventError", task.name, "Task name should match") + assert.Equal(t, "testTypeEventError", task.objectType, "Task object type should match") + assert.Equal(t, "testVersionEventError", task.resourceVersion, "Task resource version should match") + assert.Equal(t, subscribers, task.subs, "Task subscribers should match") +} + +func TestTimeout(t *testing.T) { + var wg sync.WaitGroup + + handler := &moduleCiHandler{} + TaskMan.StartTaskManager() + wg.Add(1) + go func() { + defer wg.Done() + eventbus.EBus.StartSubscriber("testModuleTimeout", "testEventTimeout", 1, handler) + }() + + // Wait for both the TaskManager and the subscriber to be started + wg.Wait() + subscribers := eventbus.EBus.GetSubscribers("testEventTimeout") + if len(subscribers) == 0 { + t.Fatalf("No subscribers found for event type 'testEventTimeout'") + } + TaskMan.CreateTask("testTaskTimeout", "testTypeTimeout", "testVersionTimeout", subscribers) + + time.Sleep(35 * time.Second) + + task := TaskMan.taskQueue.Dequeue() + + assert.NotNil(t, task) + assert.Equal(t, "testTaskTimeout", task.name) + assert.Equal(t, "testTypeTimeout", task.objectType) + assert.Equal(t, "testVersionTimeout", task.resourceVersion) + assert.Equal(t, subscribers, task.subs) +} + +func TestNotificationIdNotMatching(t *testing.T) { + var wg sync.WaitGroup + + // Start the TaskManager in a separate goroutine + wg.Add(1) + go func() { + defer wg.Done() + TaskMan.StartTaskManager() + }() + + handler := &moduleCiHandler{} + // Start the subscriber and wait for it to be ready + wg.Add(1) + go func() { + defer wg.Done() + eventbus.EBus.StartSubscriber("testModuleNotificationIdNotMatching", "testEventNotificationIdNotMatching", 1, handler) + }() + + // Wait for both the TaskManager and the subscriber to be started + wg.Wait() + + for i := 0; i < cap(TaskMan.taskStatusChan); i++ { + TaskMan.taskStatusChan <- &TaskStatus{} + } + + subscribers := eventbus.EBus.GetSubscribers("testEventNotificationIdNotMatching") + if len(subscribers) == 0 { + t.Fatalf("No subscribers found for event type 'testEvent'") + } + TaskMan.CreateTask("testTaskNotificationIdNotMatching", "testTypeNotificationIdNotMatching", "testVersionNotificationIdNotMatching", subscribers) + + time.Sleep(500 * time.Millisecond) + + select { + case task := <-TaskMan.taskQueue.channel: + if task.name == "testTask" { + t.Errorf("assert failed:") + } + default: + } + + for len(TaskMan.taskStatusChan) > 0 { + <-TaskMan.taskStatusChan + } +} + +func TestStatusUpdatedChannelNotAvailable(t *testing.T) { + var wg sync.WaitGroup + + wg.Add(1) + + go handleTestEventbusy(&wg) + + wg.Wait() + actualRetVal := retVal + + assert.Equal(t, true, actualRetVal) +}