Skip to content

Commit

Permalink
add aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
ahab94 committed Nov 18, 2020
1 parent 58b8c72 commit c5d69e3
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 41 deletions.
60 changes: 40 additions & 20 deletions concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@ import (
"context"
"errors"
"fmt"

"github.com/ahab94/engine"
uuid "github.com/satori/go.uuid"
)

// Concurrent is an executor for concurrent executions
type Concurrent struct {
executor
engine *engine.Engine
block bool
successHandler func()
failHandler func(err error)
engine *engine.Engine
successHandler func()
failHandler func(err error)
aggregateInput chan chan bool
aggregateOutput chan bool
dispatchComplete bool
block bool
}

type ConcurrentOption func(*Concurrent)
Expand All @@ -27,8 +29,10 @@ func NewConcurrent(ctx context.Context, engine *engine.Engine, completionBlock b
id: fmt.Sprintf("%s-%s", "concurrent", uuid.NewV4().String()),
ctx: ctx,
},
engine: engine,
block: completionBlock,
aggregateInput: make(chan chan bool, engine.WorkerCount()),
aggregateOutput: make(chan bool),
engine: engine,
block: completionBlock,
}

for _, opt := range opts {
Expand Down Expand Up @@ -58,29 +62,22 @@ func (c *Concurrent) Execute() error {
return err
}

go c.aggregate()

return c.executeDispatch()
}

func (c *Concurrent) executeDispatch() error {
doneChans := make([]<-chan bool, 0)
for _, exec := range c.executables {
if !exec.IsCompleted() {
done := c.engine.Do(exec)
doneChans = append(doneChans, done)
c.aggregateInput <- c.engine.Do(exec)
}
}

if c.block {
success := true
for _, done := range doneChans {
if !success {
<-done
continue
}
success = <-done
}
c.dispatchComplete = true

if !success {
if c.block {
if !<-c.aggregateOutput {
return errors.New("failed to execute all tasks")
}
}
Expand All @@ -103,3 +100,26 @@ func (c *Concurrent) OnFailure(err error) {
c.failHandler(err)
}
}

func (c *Concurrent) aggregate() {
aggRes := true

for {
select {
case agg := <-c.aggregateInput:
log(c.id).Debugf("awaiting result %p", agg)
if !aggRes {
<-agg
continue
}

aggRes = <-agg
default:
if c.dispatchComplete {
log(c.id).Debugf("stopping aggregration...")
c.aggregateOutput <- aggRes
return
}
}
}
}
20 changes: 2 additions & 18 deletions concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,6 @@ func TestConcurrent_Execute(t *testing.T) {
},
wantErr: false,
},
{
name: "success - work all tasks - expect incomplete",
fields: fields{
executables: []Executable{
&testTask{
ID: 1,
Fail: true,
Delay: "2s",
}, &testTask{
ID: 2,
Fail: false,
Delay: "100ms",
},
},
completion: false,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -70,6 +52,8 @@ func BenchmarkConcurrent_Execute(b *testing.B) {
e := engine.NewEngine(context.TODO())
e.Start(100)
tasks := nTasks(1000)

b.Logf("goroutines before adding tasks %d", runtime.NumGoroutine())
c := NewConcurrent(context.TODO(), e, true)
for _, task := range tasks {
c.Add(task)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/ahab94/flash
go 1.12

require (
github.com/ahab94/engine v0.1.2
github.com/ahab94/engine v0.1.3
github.com/satori/go.uuid v1.2.0
github.com/sirupsen/logrus v1.4.2
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/ahab94/engine v0.1.2 h1:p4VsZkyfwpQE7Rw/GLK5NMKsL1XnueX9SQvUTA2mgf0=
github.com/ahab94/engine v0.1.2/go.mod h1:HqIdeM55aBhvAd5qfi0Pqkzco7BphSvN9VNIh4v95uU=
github.com/ahab94/engine v0.1.3 h1:4DAJEWAeJaPjE6h0TmNOLEARCNylPJ+ROSX3AruckWg=
github.com/ahab94/engine v0.1.3/go.mod h1:HqIdeM55aBhvAd5qfi0Pqkzco7BphSvN9VNIh4v95uU=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
Expand Down

0 comments on commit c5d69e3

Please sign in to comment.