Skip to content

Commit

Permalink
Task scheduler sends map intents to worker manager
Browse files Browse the repository at this point in the history
  • Loading branch information
eagraf committed Apr 7, 2020
1 parent fe454d7 commit e657407
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 2 deletions.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func main() {
wm := workers.GetWorkerManager()
wm.Start()

ts := tasks.Start(taskRegistry)
ts := tasks.Start(taskRegistry, wm.MapTaskQueue)
ts.IntentQueue <- &newIntent

r := RegisterRoutes()
Expand Down
5 changes: 4 additions & 1 deletion tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type TaskScheduler struct {
}

// Start listening for intents originated by the API/scheduler
func Start(taskRegistry map[string]TaskType) *TaskScheduler {
func Start(taskRegistry map[string]TaskType, mapTaskQueue chan *Intent) *TaskScheduler {

// Create TaskScheduler
var ts = TaskScheduler{
Expand All @@ -38,6 +38,9 @@ func Start(taskRegistry map[string]TaskType) *TaskScheduler {
go ts.handleSetup(intent)
case "map":
fmt.Println("Map")
fmt.Println(mapTaskQueue)
mapTaskQueue <- intent
fmt.Println("qua")
// Handle map task
case "reduce":
fmt.Println("Reduce")
Expand Down
1 change: 1 addition & 0 deletions workers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var wmSingleton = WorkerManager{
Workers: make(map[string]Worker),
AvailableWorkers: make(chan Worker, 1024),
AllocatedWorkers: make(map[string][]Worker),
MapTaskQueue: make(chan *tasks.Intent),
workersMutex: sync.RWMutex{},
allocationMutex: sync.RWMutex{},
}
Expand Down

0 comments on commit e657407

Please sign in to comment.