Skip to content

Commit

Permalink
Merge pull request #15 from rogosprojects/debug/race-conditions
Browse files Browse the repository at this point in the history
fix: avoid race conditions!
  • Loading branch information
rogosprojects authored Aug 20, 2024
2 parents 38fa5b3 + 20dd2a4 commit d67a361
Showing 1 changed file with 123 additions and 123 deletions.
246 changes: 123 additions & 123 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
"bufio"
"context"
"fmt"
"github.com/mattn/go-tty"
"io"
"k8s.io/client-go/util/homedir"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/mattn/go-tty"

"atomicgo.dev/keyboard/keys"
"github.com/pterm/pterm"
"github.com/pterm/pterm/putils"
Expand All @@ -24,7 +25,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)

var (
Expand All @@ -44,27 +44,13 @@ var (

var (
allPods *bool
anyLogFound bool
defaultLogPath = "logs/" + time.Now().Format("2006-01-02T15-04")
wg sync.WaitGroup
)

// NotifyReadSize notify some bytes have been read
type NotifyReadSize func(total int, delta int)

// MeteredReader decorate Reader to measure reads
type MeteredReader struct {
reader io.Reader
total int
notify NotifyReadSize
}

// notify progress through specified function
func (w *MeteredReader) Read(p []byte) (int, error) {
size, err := w.reader.Read(p)
w.total += size
w.notify(w.total, size)
return size, err
}
const (
fileNameSeparator = "__"
)

// splashScreen prints the splash screen!
func splashScreen() {
Expand All @@ -91,6 +77,7 @@ func configClient() {
if err != nil {
pterm.Fatal.Printfln("kubeconfig error while reading %s\nPlease provide a valid kubeconfig file with \"--kubeconfig <file_path>\"", *kubeconfig)
}
config.Burst = 100

// create the client
client, err = kubernetes.NewForConfig(config)
Expand Down Expand Up @@ -234,44 +221,72 @@ func getLopOpts() v1.PodLogOptions {
}

// getPodLogs gets logs for the pods
func getPodLogs(pods v1.PodList, logOpts v1.PodLogOptions) {
var wg sync.WaitGroup
// Create a multi printer for managing multiple printers
multiPrinter := pterm.DefaultMultiPrinter
multiPrinter.Start()

for _, pod := range pods.Items {
func getPodLogs(pods v1.PodList, logOpts v1.PodLogOptions) []string {
var logFiles []string
var trees []*pterm.TreePrinter
for i, pod := range pods.Items {
var _podTree = pterm.TreeNode{
Text: pterm.Info.
WithPrefix(pterm.Prefix{Text: "[Pod]", Style: pterm.Info.MessageStyle}).
WithPrefix(pterm.Prefix{Text: pod.Name}).
WithMessageStyle(pterm.DefaultBasicText.Style).
Sprintf(pod.Name),
Sprintf(pterm.Sprintf(pterm.Blue("[Pod #%d]"), i+1)),
}
var containerTree []pterm.TreeNode

for _, container := range pod.Spec.Containers {
containerTree = append(containerTree, pterm.TreeNode{Text: container.Name})
_podTree.Children = containerTree

logFile := createLogFile(pod.Name, container.Name)
logFiles = append(logFiles, logFile.Name())

wg.Add(1)
go streamLog(pod, container, logOpts, &wg, &multiPrinter)
go streamLog(pod, container, logFile, logOpts)
}
err := pterm.DefaultTree.WithRoot(_podTree).Render()
trees = append(trees, pterm.DefaultTree.WithRoot(_podTree))
}

// Print the tree
pterm.Info.Printfln(pterm.Sprintf("Found %s Pod(s) %s Container(s)", pterm.Green(len(pods.Items)), pterm.Green(len(logFiles))))
for _, tree := range trees {
err := tree.Render()
if err != nil {
return
panic(err.Error())
}
}
if *follow {
pterm.Info.Printfln("Press %s to stop streaming logs in %s", pterm.Green("q"), pterm.Green(*logPath))
pressKeyToExit()
pterm.Info.Printfln("Acquiring logs 🚀")

return logFiles
}

func printLogSize(logFile []string) {
if (len(logFile)) == 0 {
pterm.Error.Printfln("No logs saved")
return
}
pterm.Info.Printfln("Logs saved to " + pterm.Green(*logPath))

for _, log := range logFile {
_log := filepath.Base(log)
fileInfo, err := os.Stat(log)
if err != nil {
continue
}

podName, containerName := strings.Split(_log, fileNameSeparator)[0], strings.Split(_log, fileNameSeparator)[1]

s := pterm.Style{pterm.FgGray, pterm.BgDefault, pterm.Italic}

pterm.Info.WithPrefix(
pterm.Prefix{
Text: podName + fileNameSeparator + pterm.Green(containerName),
}).Println(pterm.DefaultBasicText.WithStyle(&s).Sprintf("\t" + convertBytes(fileInfo.Size())))
}

// wait for all goroutines to finish
wg.Wait()
}

// streamLog streams logs for the container
func streamLog(pod v1.Pod, container v1.Container, logOpts v1.PodLogOptions, wg *sync.WaitGroup, multiPrinter *pterm.MultiPrinter) {
func streamLog(pod v1.Pod, container v1.Container, logFile *os.File, logOpts v1.PodLogOptions) {
defer wg.Done()

logOpts.Container = container.Name
Expand All @@ -285,47 +300,20 @@ func streamLog(pod v1.Pod, container v1.Container, logOpts v1.PodLogOptions, wg
//containerTree = append(containerTree, pterm.TreeNode{Text: pterm.Red(container.Name)})
return
}

writeLogToDisk(logs, pod.Name, container.Name, multiPrinter)

}

// findPodByLabel finds pods by label
func findPodByLabel(label string) v1.PodList {
pterm.Info.Printf("Getting Pods in namespace %s with label %s\n\n", pterm.Green(*namespace), pterm.Green(label))

pods, err := client.CoreV1().Pods(*namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: label,
})
if statusError, isStatus := err.(*errors.StatusError); isStatus {
fmt.Printf("Error getting pods in namespace %s: %v\n",
*namespace, statusError.ErrStatus.Message)
}
if err != nil {
panic(err.Error())
}

// if pods are not found print message
if len(pods.Items) == 0 {
pterm.Error.Printfln("No pods found in namespace %s with label %s\n", *namespace, label)
}

return *pods
}

// writeLogToDisk writes logs to disk
func writeLogToDisk(logs io.ReadCloser, podName string, containerName string, multiPrinter *pterm.MultiPrinter) {
anyLogFound = true

logName := fmt.Sprintf("%s-%s.log", podName, containerName)

defer func(logs io.ReadCloser) {
err := logs.Close()
if err != nil {
panic(err.Error())
}
}(logs)

writeLogToDisk(logs, logFile)

}

func createLogFile(podName string, containerName string) *os.File {
logName := fmt.Sprintf("%s%s%s.log", podName, fileNameSeparator, containerName)

// Create the log file
if err := os.MkdirAll(*logPath, 0755); err != nil {
panic(err.Error())
Expand All @@ -336,29 +324,13 @@ func writeLogToDisk(logs io.ReadCloser, podName string, containerName string, mu
if err != nil {
panic(err.Error())
}
defer func(logFile *os.File) {
err := logFile.Close()
if err != nil {
panic(err.Error())
}
}(logFile)

spinnerLog, _ := pterm.DefaultSpinner.WithWriter(multiPrinter.NewWriter()).
WithRemoveWhenDone(false).Start("Acquiring logs...")
defer spinnerLog.Stop()

reader := &MeteredReader{reader: bufio.NewReader(logs), notify: func(total, delta int) {
s := pterm.Style{pterm.FgWhite, pterm.BgDefault, pterm.Bold, pterm.Italic}

spinnerLog.UpdateText(pterm.Info.WithPrefix(
pterm.Prefix{
Text: convertBytes(total),
Style: &s,
}).
WithMessageStyle(&s).
Sprintf("%s/%s", podName, containerName))
return logFile
}

}}
// writeLogToDisk writes logs to disk
func writeLogToDisk(logs io.ReadCloser, logFile *os.File) {
reader := bufio.NewReader(logs)

// Create a buffered reader and writer
writer := bufio.NewWriter(logFile)
Expand All @@ -374,7 +346,54 @@ func writeLogToDisk(logs io.ReadCloser, podName string, containerName string, mu
}
}

func convertBytes(bytes int) string {
// findPodByLabel finds pods by label
func findPodByLabel(label string) v1.PodList {
pterm.Info.Printf("Getting Pods with label %s\n\n", pterm.Green(label))

pods, err := client.CoreV1().Pods(*namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: label,
})
if statusError, isStatus := err.(*errors.StatusError); isStatus {
fmt.Printf("Error getting pods in namespace %s: %v\n",
*namespace, statusError.ErrStatus.Message)
}
if err != nil {
panic(err.Error())
}

// if pods are not found print message
if len(pods.Items) == 0 {
pterm.Error.Printfln(pterm.Sprintf("No pods found in namespace %s with label %s\n", *namespace, label))
}

return *pods
}

func pressKeyToExit() {
t, errTty := tty.Open()
if errTty != nil {
panic(errTty)
}
defer t.Close()

// race condition with spinner: known issue, we don't care
spinnerLog, _ := pterm.DefaultSpinner.WithSequence(". ", ".. ", ".|.", " ..", " .").WithRemoveWhenDone(true).Start(pterm.Sprintf("Press %s to stop streaming logs in %s", pterm.Green("q"), pterm.Green(*logPath)))
defer spinnerLog.Stop()

for {
key, err := t.ReadRune()
if err != nil {
panic(err)
}
// if pressed q or Q
if key == 113 || key == 81 {
//pterm.Info.Printfln("Exiting")
break
}
}
}

func convertBytes(bytes int64) string {
if bytes == 0 {
return pterm.Red(" (0 B)")
}
Expand Down Expand Up @@ -414,37 +433,18 @@ It is designed to be fast and efficient, and can get logs from multiple Pods/Con
}
}

getPodLogs(podList, getLopOpts())

if anyLogFound {
pterm.Info.Printfln("Logs saved to %s", pterm.Green(*logPath))
}
},
}

func pressKeyToExit() {
t, errTty := tty.Open()
if errTty != nil {
panic(errTty)
}

go func() {
defer t.Close()
for {
key, err := t.ReadRune()
if err != nil {
panic(err)
}
logFiles := getPodLogs(podList, getLopOpts())

// if pressed q or Q
if key == 113 || key == 81 {
pterm.Info.Printfln("Exiting")
break
}
if *follow && len(logFiles) > 0 {
// press a key to terminate the process
pressKeyToExit()
} else {
// wait for all goroutines to finish
wg.Wait()
}
os.Exit(0)
}()

printLogSize(logFiles)
},
}

// Execute is the entry point for the command
Expand Down

0 comments on commit d67a361

Please sign in to comment.