Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large postgres logs #105

Merged
merged 10 commits into from
Oct 3, 2024
231 changes: 212 additions & 19 deletions internal/cmd/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"io"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"text/tabwriter"
"time"
Expand All @@ -47,6 +49,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"sigs.k8s.io/yaml"

"github.com/crunchydata/postgres-operator-client/internal"
Expand Down Expand Up @@ -449,7 +452,8 @@ Collecting PGO CLI logs...
// All Postgres Logs on the Postgres Instances (primary and replicas)
if numLogs > 0 {
if err == nil {
err = gatherPostgresLogsAndConfigs(ctx, clientset, restConfig, namespace, clusterName, numLogs, tw, cmd)
err = gatherPostgresLogsAndConfigs(ctx, clientset, restConfig,
namespace, clusterName, outputDir, outputFile, numLogs, tw, cmd)
}
}

Expand Down Expand Up @@ -955,6 +959,8 @@ func gatherPostgresLogsAndConfigs(ctx context.Context,
config *rest.Config,
namespace string,
clusterName string,
outputDir string,
outputFile string,
numLogs int,
tw *tar.Writer,
cmd *cobra.Command,
Expand Down Expand Up @@ -1024,30 +1030,56 @@ func gatherPostgresLogsAndConfigs(ctx context.Context,
}

logFiles := strings.Split(strings.TrimSpace(stdout), "\n")
for _, logFile := range logFiles {
writeDebug(cmd, fmt.Sprintf("LOG FILE: %s\n", logFile))
var buf bytes.Buffer

stdout, stderr, err := Executor(exec).catFile(logFile)
// localDirectory is created to save data on disk
// e.g. outputDir/crunchy_k8s_support_export_2022-08-08-115726-0400/remotePath
localDirectory := filepath.Join(outputDir, strings.ReplaceAll(outputFile, ".tar.gz", ""))

// flag to determine whether or not to remove localDirectory after the loop
// When an error happens, this flag will switch to false
// It's nice to have the extra data around when errors have happened
doCleanup := true

for _, logFile := range logFiles {
// get the file size to stream
fileSize, err := getRemoteFileSize(config, namespace, pod.Name, util.ContainerDatabase, logFile)
if err != nil {
if apierrors.IsForbidden(err) {
writeInfo(cmd, err.Error())
// Continue and output errors for each log file
// Allow the user to see and address all issues at once
continue
}
return err
writeDebug(cmd, fmt.Sprintf("could not get file size for %s: %v\n", logFile, err))
continue
}

buf.Write([]byte(stdout))
if stderr != "" {
str := fmt.Sprintf("\nError returned: %s\n", stderr)
buf.Write([]byte(str))
// fileSpecSrc is namespace/podname:path/to/file
// fileSpecDest is the local destination of the file
// These are used to help the user grab the file manually when necessary
fileSpecSrc := fmt.Sprintf("%s/%s:%s", namespace, pod.Name, logFile)
fileSpecDest := filepath.Join(localDirectory, logFile)
writeInfo(cmd, fmt.Sprintf("\tSize of %-85s %v", fileSpecSrc, ConvertBytes(fileSize)))
philrhurst marked this conversation as resolved.
Show resolved Hide resolved

// Stream the file to disk and write the local file to the tar
err = streamFileFromPod(clientset, config, tw,
localDirectory, clusterName, namespace, pod.Name, util.ContainerDatabase, logFile, fileSize)

if err != nil {
doCleanup = false // prevent the deletion of localDirectory so a user can examine contents
writeInfo(cmd, fmt.Sprintf("\tError streaming file %s: %v", logFile, err))
writeInfo(cmd, fmt.Sprintf("\tCollect manually with kubectl cp -c %s %s %s",
util.ContainerDatabase, fileSpecSrc, fileSpecDest))
writeInfo(cmd, fmt.Sprintf("\tRemove %s manually after gathering necessary information", localDirectory))
continue
}

path := clusterName + fmt.Sprintf("/pods/%s/", pod.Name) + logFile
if err := writeTar(tw, buf.Bytes(), path, cmd); err != nil {
return err
}

// doCleanup is true when there are no errors above.
if doCleanup {
// Remove the local directory created to hold the data
// Errors in removing localDirectory should instruct the user to remove manually.
// This happens often on Windows
err = os.RemoveAll(localDirectory)
if err != nil {
writeInfo(cmd, fmt.Sprintf("\tError removing %s: %v", localDirectory, err))
writeInfo(cmd, fmt.Sprintf("\tYou may need to remove %s manually", localDirectory))
continue
}
}

Expand Down Expand Up @@ -1800,3 +1832,164 @@ func writeDebug(cmd *cobra.Command, s string) {
t := time.Now()
cmd.Printf("%s - DEBUG - %s", t.Format(logTimeFormat), s)
}

// streamFileFromPod streams the file from the Kubernetes pod to a local file.
func streamFileFromPod(clientset *kubernetes.Clientset,
config *rest.Config, tw *tar.Writer,
localDirectory, clusterName, namespace, podName, containerName, remotePath string,
remoteFileSize int64) error {

// create localPath to write the streamed data from remotePath
// use the uniqueness of outputFile to avoid overwriting other files
localPath := filepath.Join(localDirectory, remotePath)
if err := os.MkdirAll(filepath.Dir(localPath), 0750); err != nil {
return fmt.Errorf("failed to create path for file: %w", err)
}
outFile, err := os.Create(filepath.Clean(localPath))
if err != nil {
return fmt.Errorf("failed to create local file: %w", err)
}

defer func() {
// ignore any errors from Close functions, the writers will be
// closed when the program exits
if outFile != nil {
_ = outFile.Close()
}
}()

// request to cat remotePath
req := clientset.CoreV1().RESTClient().
Get().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec").
Param("container", containerName).
Param("command", "cat").
Param("command", remotePath).
Param("stderr", "true").
Param("stdout", "true")

exec, err := remotecommand.NewSPDYExecutor(config, "GET", req.URL())
if err != nil {
return fmt.Errorf("failed to initialize SPDY executor: %w", err)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the util.NewPodExecutor to set this command up?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem I've had there is the current flow assumes the contents will be held in memory. I couldn't figure out how to use the current flow to pass to *os.File - Definitely a limitation on my part, so feedback is welcome

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, NewPodExecutor returns a func that takes stdout, stderr io.Writer, and I'm not against writing a specific function for this executor, like

func (exec Executor) copyFile(source string, destination *os.File) (string, error) {
	var stderr bytes.Buffer
        command := fmt.Sprintf("cat %s", source)
	err := exec(nil, &desination, &stderr, "bash", "-ceu", "--", command)
	return stderr.String(), err
}

That's a slight variation on the catFile func, the main difference being that it sets the output to the file (I think) and doesn't return a stdout. Not sure that would work, but that's my first thought.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one slight change &destination to destination, but I think this works.

func (exec Executor) copyFile(source string, destination *os.File) (string, error) {
	var stderr bytes.Buffer
	command := fmt.Sprintf("cat %s", source)
	err := exec(nil, destination, &stderr, "bash", "-ceu", "--", command)
	return stderr.String(), err
}


// stream remotePath to localPath
err = exec.Stream(remotecommand.StreamOptions{
Stdout: outFile,
Stderr: os.Stderr,
Tty: false,
})
if err != nil {
return fmt.Errorf("error during file streaming: %w", err)
}

// compare file sizes
localFileInfo, err := os.Stat(localPath)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we use os.Stat(FILE) and on line 1912 we use file.Stat() -- they're functionally the same, right? Not a blocker, just curious if we could use the same func twice. (I'm looking at the code for the two functions and they're not identical in the way I sort of thought they might be, though they both end up calling the fillFileStatFromSys helper func.)

if err != nil {
return fmt.Errorf("failed to get file info: %w", err)
}
if remoteFileSize != localFileInfo.Size() {
benjaminjb marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("filesize mismatch: remote size is %v and local size is %v",
remoteFileSize, localFileInfo.Size())
}

// add localPath to the support export tar
tarPath := fmt.Sprintf("%s/pods/%s/%s", clusterName, podName, remotePath)
err = addFileToTar(tw, localPath, tarPath)
if err != nil {
return fmt.Errorf("error writing to tar: %w", err)
}

return nil
}

// addFileToTar copies a local file into a tar archive
func addFileToTar(tw *tar.Writer, localPath, tarPath string) error {
// Open the file to be added to the tar
file, err := os.Open(filepath.Clean(localPath))
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer func() {
// ignore any errors from Close functions, the writers will be
// closed when the program exits
if file != nil {
_ = file.Close()
}
}()

// Get file info to create tar header
fileInfo, err := file.Stat()
if err != nil {
return fmt.Errorf("failed to get file info: %w", err)
}

// Create tar header
header := &tar.Header{
Name: tarPath, // Name in the tar archive
Size: fileInfo.Size(), // File size
Mode: int64(fileInfo.Mode()), // File mode
ModTime: fileInfo.ModTime(), // Modification time
}

// Write header to the tar
err = tw.WriteHeader(header)
if err != nil {
return fmt.Errorf("failed to write tar header: %w", err)
}

// Stream the file content to the tar
_, err = io.Copy(tw, file)
benjaminjb marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("failed to copy file data to tar: %w", err)
}

return nil
}

// getRemoteFileSize returns the size of a file within a container so that we can stream its contents
func getRemoteFileSize(config *rest.Config,
namespace string, podName string, containerName string, filePath string) (int64, error) {

podExec, err := util.NewPodExecutor(config)
if err != nil {
return 0, fmt.Errorf("could not create executor: %w", err)
}
exec := func(stdin io.Reader, stdout, stderr io.Writer, command ...string,
) error {
return podExec(namespace, podName, containerName,
stdin, stdout, stderr, command...)
}

// Prepare the command to get the file size using "stat -c %s <file>"
command := fmt.Sprintf("stat -c %s %s", "%s", filePath)
stdout, stderr, err := Executor(exec).bashCommand(command)
if err != nil {
return 0, fmt.Errorf("could not get file size: %w, stderr: %s", err, stderr)
}

// Parse the file size from stdout
size, err := strconv.ParseInt(strings.TrimSpace(stdout), 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse file size: %w", err)
}

return size, nil
}

// ConvertBytes converts a byte size (int64) into a human-readable format.
func ConvertBytes(bytes int64) string {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not exporting this command for another package to use, so convertBytes should be fine. (Also here's a case where I think tests would do some work as documentation.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this one

const unit = 1024
if bytes < unit {
return fmt.Sprintf("%d B", bytes)
}
div, exp := int64(unit), 0
for n := bytes / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])
}