diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index cc69dac1..8040e237 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -739,7 +739,7 @@ func (a *Aggregator) Start() error { err = a.streamClient.Start() if err != nil { - log.Fatalf("failed to start stream client, error: %v", err) + return fmt.Errorf("failed to start stream client, error: %w", err) } bookMark := &datastream.BookMark{ @@ -749,12 +749,12 @@ func (a *Aggregator) Start() error { marshalledBookMark, err := proto.Marshal(bookMark) if err != nil { - log.Fatalf("failed to marshal bookmark: %v", err) + return fmt.Errorf("failed to marshal bookmark: %w", err) } err = a.streamClient.ExecCommandStartBookmark(marshalledBookMark) if err != nil { - log.Fatalf("failed to connect to data stream: %v", err) + return fmt.Errorf("failed to connect to data stream: %w", err) } // A this point everything is ready, so start serving diff --git a/cmd/run.go b/cmd/run.go index af5ff7a4..477bd90e 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -101,6 +101,7 @@ func start(cliCtx *cli.Context) error { // start aggregator in a goroutine, checking for errors go func() { if err := aggregator.Start(); err != nil { + aggregator.Stop() log.Fatal(err) } }()