Skip to content

Commit

Permalink
Refactor stream offset
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Nov 27, 2023
1 parent 59ecf68 commit 169f3c8
Showing 1 changed file with 23 additions and 17 deletions.
40 changes: 23 additions & 17 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,25 +158,12 @@ func RootCmd() *cobra.Command {
}

// parse stream offset
switch streamOffset {
case "":
cfg.StreamOffset = nil
case "next", "first", "last":
cfg.StreamOffset = streamOffset
default:
// check if streamOffset can be parsed as unsigned integer (chunkID)
if chunkID, err := strconv.ParseUint(streamOffset, 10, 64); err == nil {
cfg.StreamOffset = chunkID
break
}
// check if streamOffset can be parsed as an ISO 8601 timestamp
if timestamp, err := iso8601.ParseString(streamOffset); err == nil {
cfg.StreamOffset = timestamp
break
}
fmt.Fprintf(os.Stderr, "ERROR: invalid stream offset: %s\n", streamOffset)
offset, err := parseStreamOffset(streamOffset)
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", err)
os.Exit(1)
}
cfg.StreamOffset = offset
},
}
rootCmd.PersistentFlags().StringVarP(&cfg.PublisherUri, "publisher-uri", "", "", "URI for publishing")
Expand Down Expand Up @@ -309,6 +296,25 @@ func defaultUri(proto string) string {
return uri
}

func parseStreamOffset(offset string) (any, error) {
switch offset {
case "":
return nil, nil
case "next", "first", "last":
return offset, nil
default:
// check if streamOffset can be parsed as unsigned integer (chunkID)
if chunkID, err := strconv.ParseUint(offset, 10, 64); err == nil {
return chunkID, nil
}
// check if streamOffset can be parsed as an ISO 8601 timestamp
if timestamp, err := iso8601.ParseString(offset); err == nil {
return timestamp, nil
}
}
return nil, fmt.Errorf("invalid stream offset: %s", offset)
}

func shutdown() {
metricsServer := metrics.GetMetricsServer()
metricsServer.PrintMetrics()
Expand Down

0 comments on commit 169f3c8

Please sign in to comment.