Skip to content

Commit

Permalink
First commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel-Beneyto committed Mar 18, 2019
1 parent 1f10fcc commit b62ce6a
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 1 deletion.
90 changes: 89 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,90 @@
# fofi-go
# fofi
Fan-out Fan-in Go example

We have a time/resource expensive job to do. In fact, we need to do these expensive jobs several times.<br>
Later, we want to process the output of these jobs and operate with the results.

We can run this within a loop, sequentially, something like:
```
for job in jobs:
output = run(job)
compute(output)
```

This would work, but `golang` provides very useful features to resolve this in a more performant way.

This example describes a `fan-out` `fan-in` pattern:

![fan-out fan-in](/docs/img/fan-out-fan-in.png)

Using `concurrency`, `parallelism` and `go channels` we can achieve a performant implementation easily.

Our main flow looks like this:

![main flow](/docs/img/fofi-main-flow.png)

Basically, we are spawning many producer processes (fan out), these producers are writing the output in a go channel. We are also spawning a single consumer process which is reading output go channel and processing the data (fan in).

Running `fofi.go` with `concurrentGoRoutines = 1`, to all effects, would be like running sequential solution:
```
$ time go run fofi.go
starting fofi processing
producer processing 5
consumer processing 5
producer wrote 5
producer processing 3
consumer processed 5
producer wrote 3
producer processing 2
consumer processing 3
consumer processed 3
producer wrote 2
producer processing 4
consumer processing 2
consumer processed 2
producer wrote 4
producer processing 1
consumer processing 4
consumer processed 4
producer wrote 1
consumer processing 1
consumer processed 1
consumer done
fofi processing finished
$ go run fofi.go 0.26s user 0.15s system 3% cpu 10.858 total
```

Running `fofi.go` with `concurrentGoRoutines = 5`:
```
time go run fofi.go
starting fofi processing
producer processing 5
producer processing 1
producer processing 3
producer processing 4
producer processing 2
producer wrote 4
consumer processing 4
producer wrote 3
producer wrote 5
producer wrote 2
producer wrote 1
consumer processed 4
consumer processing 1
consumer processed 1
consumer processing 3
consumer processed 3
consumer processing 2
consumer processed 2
consumer processing 5
consumer processed 5
consumer done
fofi processing finished
go run fofi.go 0.26s user 0.15s system 8% cpu 4.835 total
```

Caveats:
- Is important to close Consumer channel once all Producers have finished the work, otherwise we might produce a panic error
- We are using a channel to limit the number of Producer go routines running concurrently
- Producer channel size limits the number of concurrent Producer processes running
- Consumer channel size defines the # of elements that Producers can put there before become blocked. This would generate `backpressure` to Producers and this might be intended and beneficial or unintended and harmful
Binary file added docs/img/fan-out-fan-in.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/img/fofi-main-flow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
72 changes: 72 additions & 0 deletions fofi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package main

import (
"fmt"
"strconv"
"sync"
"time"
)

var (
// concurrentGoRoutines defines the # of concurrent Producer go routines
// running at the same time.
concurrentGoRoutines = 5
)

// Producer simulates a slow and resource consuming job.
func Producer(input string, out chan int, lock chan bool, wg *sync.WaitGroup) {
defer func() {
wg.Done()
<-lock
}()
lock <- true
fmt.Printf("producer processing %s\n", input)

time.Sleep(2 * time.Second)

i, err := strconv.Atoi(input)
if err != nil {
fmt.Printf("error: producer couldn't process %s\n", input)
return
}
out <- i
fmt.Printf("producer wrote %s\n", input)
}

// Consumer simulates a fast and low resource consuming job.
func Consumer(in chan int, wg *sync.WaitGroup) {
defer func() {
wg.Done()
}()

for i := range in {
fmt.Printf("consumer processing %d\n", i)
time.Sleep(500 * time.Millisecond)
fmt.Printf("consumer processed %d\n", i)
}
fmt.Println("consumer done")
}

func main() {
sampleInputData := []string{"1", "2", "3", "4", "5"}

var consumerWg sync.WaitGroup
var producerWg sync.WaitGroup
consumerCh := make(chan int, concurrentGoRoutines)
producerCh := make(chan bool, concurrentGoRoutines)

consumerWg.Add(1)
go Consumer(consumerCh, &consumerWg)

fmt.Println("starting fofi processing")
for _, str := range sampleInputData {
producerWg.Add(1)
go Producer(str, consumerCh, producerCh, &producerWg)
}

producerWg.Wait()
close(consumerCh)
close(producerCh)
consumerWg.Wait()
fmt.Println("fofi processing finished")
}

0 comments on commit b62ce6a

Please sign in to comment.