diff --git a/go.mod b/go.mod index 2a9f517c..0669cd71 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/vishvananda/netlink v1.2.1-beta.2.0.20230206183746-70ca0345eede + github.com/yuin/goldmark v1.4.13 gocloud.dev v0.28.0 golang.org/x/crypto v0.13.0 golang.org/x/oauth2 v0.7.0 @@ -103,7 +104,6 @@ require ( github.com/shopspring/decimal v1.2.0 // indirect github.com/spf13/cast v1.3.1 // indirect github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f // indirect - github.com/yuin/goldmark v1.4.13 // indirect go.mongodb.org/mongo-driver v1.10.2 // indirect go.opencensus.io v0.24.0 // indirect golang.org/x/net v0.15.0 // indirect diff --git a/pkg/backend/status.go b/pkg/backend/status.go index e533c1bb..6c8b0dbc 100644 --- a/pkg/backend/status.go +++ b/pkg/backend/status.go @@ -18,13 +18,21 @@ package backend import ( "bytes" + "context" + "fmt" "sort" + "strconv" "strings" "sync" "time" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" "github.com/olekukonko/tablewriter" + "github.com/yuin/goldmark" + "github.com/yuin/goldmark/extension" + "github.com/yuin/goldmark/parser" + "github.com/yuin/goldmark/renderer/html" "k8s.io/apimachinery/pkg/util/duration" "k8s.io/klog/v2" ) @@ -59,7 +67,7 @@ func (ms MachineStatus) Strings() []string { return []string{ ms.Name, string(ms.Status), - ConvertToHumanReadableDateType(ms.Timestamp), + ConvertToHumanReadableDateType(&ms.Timestamp), ms.Comment, } } @@ -76,7 +84,7 @@ func NewStatusReporter(nc *nats.Conn) (*StatusReporter, error) { return sp, err } -func (m *StatusReporter) setStatus(msg []byte) { +func (sp *StatusReporter) setStatus(msg []byte) { fields := strings.SplitN(string(msg), ",", 3) if len(fields) < 2 { klog.Errorln("bad status report ", string(msg)) @@ -92,27 +100,27 @@ func (m *StatusReporter) setStatus(msg []byte) { cur.Comment = fields[2] } - m.mu.Lock() - defer m.mu.Unlock() + sp.mu.Lock() + defer sp.mu.Unlock() - last, found := m.inventory[cur.Name] + last, found := sp.inventory[cur.Name] if !found || last.Status != cur.Status { - m.inventory[cur.Name] = cur + sp.inventory[cur.Name] = cur } else { cur.Timestamp = last.Timestamp if cur.Comment == "" { cur.Comment = last.Comment } - m.inventory[cur.Name] = cur + sp.inventory[cur.Name] = cur } } -func (m *StatusReporter) Render() []byte { - m.mu.Lock() - defer m.mu.Unlock() +func (sp *StatusReporter) renderRunnerInfo() []byte { + sp.mu.Lock() + defer sp.mu.Unlock() - data := make([][]string, 0, len(m.inventory)) - for _, s := range m.inventory { + data := make([][]string, 0, len(sp.inventory)) + for _, s := range sp.inventory { data = append(data, s.Strings()) } sort.Slice(data, func(i, j int) bool { @@ -135,14 +143,14 @@ func (m *StatusReporter) Render() []byte { // human-readable approximation. // ref: https://github.com/kubernetes/apimachinery/blob/v0.21.1/pkg/api/meta/table/table.go#L63-L70 // But works for timestamp before or after now. -func ConvertToHumanReadableDateType(timestamp time.Time) string { - if timestamp.IsZero() { +func ConvertToHumanReadableDateType(timestamp *time.Time) string { + if timestamp == nil || timestamp.IsZero() { return "" } var d time.Duration now := time.Now() - if now.After(timestamp) { - d = now.Sub(timestamp) + if now.After(*timestamp) { + d = now.Sub(*timestamp) } else { d = timestamp.Sub(now) } @@ -159,3 +167,167 @@ func ReportStatus(nc *nats.Conn, name string, s Status, comments ...string) { klog.Errorln(err) } } + +func (sp *StatusReporter) GenerateMarkdownReport() ([]byte, error) { + var buf bytes.Buffer + + buf.WriteString("## Runners") + buf.WriteRune('\n') + buf.Write(sp.renderRunnerInfo()) + buf.WriteRune('\n') + + names := []string{ + "gha_queued", + "gha_completed", + } + streams, err := CollectStreamInfo(sp.nc, names) + if err != nil { + return nil, err + } + buf.WriteString("## Streams") + buf.WriteRune('\n') + buf.Write(RenderStreamInfo(streams)) + buf.WriteRune('\n') + + for _, name := range names { + consumers, err := CollectConsumerInfo(sp.nc, name) + if err != nil { + return nil, err + } + buf.WriteString(fmt.Sprintf("## Consumers for Stream: %s\n", name)) + buf.WriteRune('\n') + buf.Write(RenderConsumerInfo(consumers)) + buf.WriteRune('\n') + } + return buf.Bytes(), nil +} + +func (sp *StatusReporter) GenerateHTMLReport() ([]byte, error) { + md := goldmark.New( + goldmark.WithExtensions(extension.GFM), + goldmark.WithParserOptions( + parser.WithAutoHeadingID(), + ), + goldmark.WithRendererOptions( + html.WithHardWraps(), + html.WithXHTML(), + ), + ) + + data, err := sp.GenerateMarkdownReport() + if err != nil { + return nil, err + } + + var bufHtml bytes.Buffer + if err := md.Convert(data, &bufHtml); err != nil { + return nil, err + } + return bufHtml.Bytes(), nil +} + +func CollectStreamInfo(nc *nats.Conn, names []string) ([]*jetstream.StreamInfo, error) { + js, err := jetstream.New(nc) + if err != nil { + return nil, err + } + + ctx := context.TODO() + result := make([]*jetstream.StreamInfo, 0, len(names)) + for _, name := range names { + if s, err := js.Stream(ctx, name); err != nil { + return nil, err + } else { + info, err := s.Info(ctx) + if err != nil { + return nil, err + } + if info != nil { + result = append(result, info) + } + } + } + return result, nil +} + +func RenderStreamInfo(streams []*jetstream.StreamInfo) []byte { + data := make([][]string, 0, len(streams)) + for _, s := range streams { + data = append(data, []string{ + s.Config.Name, + ConvertToHumanReadableDateType(&s.Created), + strconv.FormatUint(s.State.Msgs, 10), + ConvertToHumanReadableDateType(&s.State.LastTime), + }) + } + sort.Slice(data, func(i, j int) bool { + return data[i][0] < data[j][0] + }) + + var buf bytes.Buffer + + table := tablewriter.NewWriter(&buf) + table.SetHeader([]string{"Name", "Created", "Messages", "Last Message"}) + table.SetBorders(tablewriter.Border{Left: true, Top: false, Right: true, Bottom: false}) + table.SetCenterSeparator("|") + table.AppendBulk(data) // Add Bulk Data + table.Render() + + return buf.Bytes() +} + +func CollectConsumerInfo(nc *nats.Conn, streamName string) ([]*jetstream.ConsumerInfo, error) { + ctx := context.TODO() + js, err := jetstream.New(nc) + if err != nil { + return nil, err + } + + s, err := js.Stream(ctx, streamName) + if err != nil { + return nil, err + } + + var result []*jetstream.ConsumerInfo + + consumers := s.ListConsumers(ctx) + for cons := range consumers.Info() { + result = append(result, cons) + } + if consumers.Err() != nil { + return nil, err + } + return result, nil +} + +func RenderConsumerInfo(consumers []*jetstream.ConsumerInfo) []byte { + data := make([][]string, 0, len(consumers)) + for _, s := range consumers { + data = append(data, []string{ + s.Config.Name, + ConvertToHumanReadableDateType(&s.Created), + strconv.FormatBool(!s.PushBound), + s.Config.FilterSubject, + ConvertToHumanReadableDateType(s.Delivered.Last), + ConvertToHumanReadableDateType(s.AckFloor.Last), + strconv.Itoa(s.NumAckPending), + strconv.Itoa(s.NumRedelivered), + strconv.Itoa(s.NumWaiting), + strconv.FormatUint(s.NumPending, 10), + }) + } + sort.Slice(data, func(i, j int) bool { + return data[i][0] < data[j][0] + }) + + var buf bytes.Buffer + + table := tablewriter.NewWriter(&buf) + table.SetHeader([]string{"Name", "Created", "Pull", "Filter Subject", "Last Delivery", "Last Ack", "NumAckPending", "NumRedelivered", "NumWaiting", "NumPending"}) + table.SetBorders(tablewriter.Border{Left: true, Top: false, Right: true, Bottom: false}) + table.SetCenterSeparator("|") + table.AppendBulk(data) // Add Bulk Data + table.Render() + + return buf.Bytes() +} diff --git a/pkg/cmds/webhook.go b/pkg/cmds/webhook.go index 1966067e..b796eec9 100644 --- a/pkg/cmds/webhook.go +++ b/pkg/cmds/webhook.go @@ -137,7 +137,12 @@ func runServer(gh *github.Client, nc *nats.Conn, sp *backend.StatusReporter) err }) r.Get("/runner-status", func(w http.ResponseWriter, r *http.Request) { - _, _ = w.Write(sp.Render()) + data, err := sp.GenerateHTMLReport() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + _, _ = w.Write(data) }) r.Get("/*", func(w http.ResponseWriter, r *http.Request) {