Skip to content

Commit

Permalink
Print out progress state as and download progresses
Browse files Browse the repository at this point in the history
  • Loading branch information
kishaningithub committed Mar 8, 2020
1 parent 90baece commit aae4d41
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 11 deletions.
12 changes: 11 additions & 1 deletion cmd/shopify-csv-download/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,27 @@ func main() {
exitOnFailure(fmt.Sprintf("unable to parse url %s", productsJsonURL), err)
logWithNewLine("Downloading products as CSV...")
watch := stopwatch.Start()
err = products.SaveAsImportableCSV(*productsJsonURL, os.Stdout)
err = products.SaveAsImportableCSVWithProgressState(*productsJsonURL, os.Stdout, progressHandler)
exitOnFailure("unable to write products", err)
logWithNewLine("")
watch.Stop()
logWithNewLine("Save complete. Time taken %s", watch.String())
}

func progressHandler(state products.ProgressState) {
progressStateLineFormat := "Products downloaded: %d Products converted as CSV: %d"
logInTheSameLine(progressStateLineFormat, state.NoOfProductsDownloaded, state.NoOfProductsConvertedAsCSV)
}

func logWithNewLine(format string, args ...interface{}) {
_, _ = fmt.Fprintf(os.Stderr, format, args...)
_, _ = fmt.Fprintln(os.Stderr)
}

func logInTheSameLine(format string, args ...interface{}) {
_, _ = fmt.Fprintf(os.Stderr, "\r"+format, args...)
}

func findProductsJsonURL() string {
remainingArgs, err := flags.Parse(&opts)
exitOnFailure("unable to parse flags", err)
Expand Down
11 changes: 9 additions & 2 deletions internal/products/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ func Stream(productsResourceFullUrl url.URL) stream.ProductStream {
maxNoOfRecords := 250
products := make(chan shopify.Product, maxNoOfRecords*3)
errors := make(chan error)
noOfProductsDownloaded := make(chan int, maxNoOfRecords*3)

go func() {
defer close(errors)
defer close(products)
defer close(noOfProductsDownloaded)
pageNo := 1
noOfProductsDownloadedCounter := 0
for {
productResponse, err := resource.GetProducts(maxNoOfRecords, pageNo)
if err != nil {
Expand All @@ -24,14 +28,17 @@ func Stream(productsResourceFullUrl url.URL) stream.ProductStream {
if len(productResponse.Products) == 0 {
return
}
noOfProductsDownloadedCounter += len(productResponse.Products)
noOfProductsDownloaded <- noOfProductsDownloadedCounter
for _, product := range productResponse.Products {
products <- product
}
pageNo++
}
}()
return stream.ProductStream{
Products: products,
Errors: errors,
Products: products,
Errors: errors,
NoOfProductsDownloaded: noOfProductsDownloaded,
}
}
14 changes: 11 additions & 3 deletions internal/products/stream/product.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,29 @@ import (
)

type ProductStream struct {
Products <-chan shopify.Product
Errors <-chan error
Products <-chan shopify.Product
Errors <-chan error
NoOfProductsDownloaded <-chan int
}

func (stream ProductStream) ConvertToCSV() ProductCSVStream {
productCSV := make(chan shopify.ProductCSV)
noOfProductsConvertedAsCSV := make(chan int)
go func() {
defer close(productCSV)
defer close(noOfProductsConvertedAsCSV)
noOfProductsConvertedAsCSVCounter := 0
for product := range stream.Products {
for _, csv := range convertor.ConvertToCSVFormat(product) {
productCSV <- csv
}
noOfProductsConvertedAsCSVCounter += 1
noOfProductsConvertedAsCSV <- noOfProductsConvertedAsCSVCounter
}
}()
return ProductCSVStream{
productCSV: productCSV,
ParentStream: stream,
ProductCSV: productCSV,
NoOfProductsConvertedAsCSV: noOfProductsConvertedAsCSV,
}
}
17 changes: 13 additions & 4 deletions internal/products/stream/productcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,23 @@ import (
)

type ProductCSVStream struct {
productCSV <-chan shopify.ProductCSV
ProductCSV <-chan shopify.ProductCSV
NoOfProductsConvertedAsCSV <-chan int
ParentStream ProductStream
}

func (stream ProductCSVStream) Save(out io.Writer) error {
func (stream ProductCSVStream) Save(out io.Writer) (ProgressState, <-chan error) {
csvWriter := gocsv.DefaultCSVWriter(out)
defer csvWriter.Flush()
channel := stream.removeTypeFromProductCSVChannel(stream.productCSV)
return gocsv.MarshalChan(channel, csvWriter)
err := make(chan error)
go func() {
channel := stream.removeTypeFromProductCSVChannel(stream.ProductCSV)
err <- gocsv.MarshalChan(channel, csvWriter)
}()
return ProgressState{
NoOfProductsDownloaded: stream.ParentStream.NoOfProductsDownloaded,
NoOfProductsConvertedAsCSV: stream.NoOfProductsConvertedAsCSV,
}, err
}

func (stream ProductCSVStream) removeTypeFromProductCSVChannel(productCSV <-chan shopify.ProductCSV) <-chan interface{} {
Expand Down
24 changes: 24 additions & 0 deletions internal/products/stream/progress_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package stream

import "sync"

type ProgressState struct {
NoOfProductsDownloaded <-chan int
NoOfProductsConvertedAsCSV <-chan int
}

func (s ProgressState) Ignore() {
var waitGroup sync.WaitGroup
waitGroup.Add(2)
go func() {
for range s.NoOfProductsDownloaded {
}
waitGroup.Done()
}()
go func() {
for range s.NoOfProductsConvertedAsCSV {
}
waitGroup.Done()
}()
waitGroup.Wait()
}
10 changes: 10 additions & 0 deletions pkg/products/progress_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package products

// ProgressState comprises of progress state that can be used for displaying the consumer the current state of the operation
type ProgressState struct {
NoOfProductsDownloaded int
NoOfProductsConvertedAsCSV int
}

// ProgressHandler used as callback to process the progress state as the process happens
type ProgressHandler func(state ProgressState)
48 changes: 47 additions & 1 deletion pkg/products/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,59 @@ package products

import (
"github.com/kishaningithub/shopify-csv-download/internal/products"
"github.com/kishaningithub/shopify-csv-download/internal/products/stream"
"io"
"net/url"
)

// SaveAsImportableCSV saves products from the given url to given writer
func SaveAsImportableCSV(productsJsonURL url.URL, out io.Writer) error {
return products.
progressState, err := products.
Stream(productsJsonURL).
ConvertToCSV().
Save(out)
progressState.Ignore()
return <-err
}

// SaveAsImportableCSVWithProgressState saves products from the given url to given writer and returns the progress state using channels
func SaveAsImportableCSVWithProgressState(productsJsonURL url.URL, out io.Writer, onProgressHandler ProgressHandler) error {
progressState, err := products.
Stream(productsJsonURL).
ConvertToCSV().
Save(out)
onProgressChange(progressState, onProgressHandler)
return <-err
}

func onProgressChange(progress stream.ProgressState, onProgressHandler ProgressHandler) {
noOfProductsConvertedAsCSV := 0
noOfProductsDownloaded := 0
isNoOfProductsChannelOpen := false
isNoOfProductsConvertedAsCSVChannelOpen := false
for {
select {
case noOfProductsDownloaded, isNoOfProductsChannelOpen = <-progress.NoOfProductsDownloaded:
if !isNoOfProductsChannelOpen {
progress.NoOfProductsDownloaded = nil
break
}
onProgressHandler(ProgressState{
NoOfProductsDownloaded: noOfProductsDownloaded,
NoOfProductsConvertedAsCSV: noOfProductsConvertedAsCSV,
})
case noOfProductsConvertedAsCSV, isNoOfProductsConvertedAsCSVChannelOpen = <-progress.NoOfProductsConvertedAsCSV:
if !isNoOfProductsConvertedAsCSVChannelOpen {
progress.NoOfProductsConvertedAsCSV = nil
break
}
onProgressHandler(ProgressState{
NoOfProductsDownloaded: noOfProductsDownloaded,
NoOfProductsConvertedAsCSV: noOfProductsConvertedAsCSV,
})
}
if progress.NoOfProductsDownloaded == nil && progress.NoOfProductsConvertedAsCSV == nil {
break
}
}
}

0 comments on commit aae4d41

Please sign in to comment.