From edbf76eea7589417fff8f06131aec0b81051df25 Mon Sep 17 00:00:00 2001 From: Philip Hurst Date: Thu, 3 Oct 2024 12:42:53 -0400 Subject: [PATCH] Large postgres logs (#105) Support for large Postgres files Large Postgres files may pose a challenge for some systems. This change stores the file on disk, compares the filesize with the remote, and adds the local file to the tar. It has error handling to allow a support export to be generated when there are issues with gathering PG logs. * fixes for lint errors * Remove the cat logic for gathering PG logs. Only do streaming. * Create a unique sudirectory in outputDir (matching the unique timestamp) so that local files are logically separate from other files. Adding messaging and error handling so a user understands. * remove extra call to getRemoteFileSize * refactored getRemoteFileSize to use bashCommand to get the filesize * added copyFile function to cat a file contents to a local file * use the copyFile function rather calling manually * added example comment for fileSpecSrc * added comment to copyFile * rename ConvertBytes to convertBytes --------- Co-authored-by: Philip Hurst --- internal/cmd/exec.go | 10 ++ internal/cmd/export.go | 218 +++++++++++++++++++++++++++++++++++++---- 2 files changed, 209 insertions(+), 19 deletions(-) diff --git a/internal/cmd/exec.go b/internal/cmd/exec.go index 9688b47..2359e69 100644 --- a/internal/cmd/exec.go +++ b/internal/cmd/exec.go @@ -18,6 +18,7 @@ import ( "bytes" "fmt" "io" + "os" ) // Executor calls commands @@ -110,6 +111,15 @@ func (exec Executor) catFile(filePath string) (string, string, error) { return stdout.String(), stderr.String(), err } +// copyFile takes the full path of a file and a local destination to save the +// file on disk +func (exec Executor) copyFile(source string, destination *os.File) (string, error) { + var stderr bytes.Buffer + command := fmt.Sprintf("cat %s", source) + err := exec(nil, destination, &stderr, "bash", "-ceu", "--", command) + return stderr.String(), err +} + // patronictl takes a patronictl subcommand and returns the output of that command func (exec Executor) patronictl(cmd, output string) (string, string, error) { var stdout, stderr bytes.Buffer diff --git a/internal/cmd/export.go b/internal/cmd/export.go index f9ca1a2..0105352 100644 --- a/internal/cmd/export.go +++ b/internal/cmd/export.go @@ -23,6 +23,8 @@ import ( "io" "os" "os/exec" + "path/filepath" + "strconv" "strings" "text/tabwriter" "time" @@ -449,7 +451,8 @@ Collecting PGO CLI logs... // All Postgres Logs on the Postgres Instances (primary and replicas) if numLogs > 0 { if err == nil { - err = gatherPostgresLogsAndConfigs(ctx, clientset, restConfig, namespace, clusterName, numLogs, tw, cmd) + err = gatherPostgresLogsAndConfigs(ctx, clientset, restConfig, + namespace, clusterName, outputDir, outputFile, numLogs, tw, cmd) } } @@ -955,6 +958,8 @@ func gatherPostgresLogsAndConfigs(ctx context.Context, config *rest.Config, namespace string, clusterName string, + outputDir string, + outputFile string, numLogs int, tw *tar.Writer, cmd *cobra.Command, @@ -1024,30 +1029,57 @@ func gatherPostgresLogsAndConfigs(ctx context.Context, } logFiles := strings.Split(strings.TrimSpace(stdout), "\n") - for _, logFile := range logFiles { - writeDebug(cmd, fmt.Sprintf("LOG FILE: %s\n", logFile)) - var buf bytes.Buffer - stdout, stderr, err := Executor(exec).catFile(logFile) + // localDirectory is created to save data on disk + // e.g. outputDir/crunchy_k8s_support_export_2022-08-08-115726-0400/remotePath + localDirectory := filepath.Join(outputDir, strings.ReplaceAll(outputFile, ".tar.gz", "")) + + // flag to determine whether or not to remove localDirectory after the loop + // When an error happens, this flag will switch to false + // It's nice to have the extra data around when errors have happened + doCleanup := true + + for _, logFile := range logFiles { + // get the file size to stream + fileSize, err := getRemoteFileSize(config, namespace, pod.Name, util.ContainerDatabase, logFile) if err != nil { - if apierrors.IsForbidden(err) { - writeInfo(cmd, err.Error()) - // Continue and output errors for each log file - // Allow the user to see and address all issues at once - continue - } - return err + writeDebug(cmd, fmt.Sprintf("could not get file size for %s: %v\n", logFile, err)) + continue } - buf.Write([]byte(stdout)) - if stderr != "" { - str := fmt.Sprintf("\nError returned: %s\n", stderr) - buf.Write([]byte(str)) + // fileSpecSrc is namespace/podname:path/to/file + // fileSpecDest is the local destination of the file + // These are used to help the user grab the file manually when necessary + // e.g. postgres-operator/hippo-instance1-vp9k-0:pgdata/pg16/log/postgresql-Tue.log + fileSpecSrc := fmt.Sprintf("%s/%s:%s", namespace, pod.Name, logFile) + fileSpecDest := filepath.Join(localDirectory, logFile) + writeInfo(cmd, fmt.Sprintf("\tSize of %-85s %v", fileSpecSrc, convertBytes(fileSize))) + + // Stream the file to disk and write the local file to the tar + err = streamFileFromPod(config, tw, + localDirectory, clusterName, namespace, pod.Name, util.ContainerDatabase, logFile, fileSize) + + if err != nil { + doCleanup = false // prevent the deletion of localDirectory so a user can examine contents + writeInfo(cmd, fmt.Sprintf("\tError streaming file %s: %v", logFile, err)) + writeInfo(cmd, fmt.Sprintf("\tCollect manually with kubectl cp -c %s %s %s", + util.ContainerDatabase, fileSpecSrc, fileSpecDest)) + writeInfo(cmd, fmt.Sprintf("\tRemove %s manually after gathering necessary information", localDirectory)) + continue } - path := clusterName + fmt.Sprintf("/pods/%s/", pod.Name) + logFile - if err := writeTar(tw, buf.Bytes(), path, cmd); err != nil { - return err + } + + // doCleanup is true when there are no errors above. + if doCleanup { + // Remove the local directory created to hold the data + // Errors in removing localDirectory should instruct the user to remove manually. + // This happens often on Windows + err = os.RemoveAll(localDirectory) + if err != nil { + writeInfo(cmd, fmt.Sprintf("\tError removing %s: %v", localDirectory, err)) + writeInfo(cmd, fmt.Sprintf("\tYou may need to remove %s manually", localDirectory)) + continue } } @@ -1800,3 +1832,151 @@ func writeDebug(cmd *cobra.Command, s string) { t := time.Now() cmd.Printf("%s - DEBUG - %s", t.Format(logTimeFormat), s) } + +// streamFileFromPod streams the file from the Kubernetes pod to a local file. +func streamFileFromPod(config *rest.Config, tw *tar.Writer, + localDirectory, clusterName, namespace, podName, containerName, remotePath string, + remoteFileSize int64) error { + + // create localPath to write the streamed data from remotePath + // use the uniqueness of outputFile to avoid overwriting other files + localPath := filepath.Join(localDirectory, remotePath) + if err := os.MkdirAll(filepath.Dir(localPath), 0750); err != nil { + return fmt.Errorf("failed to create path for file: %w", err) + } + outFile, err := os.Create(filepath.Clean(localPath)) + if err != nil { + return fmt.Errorf("failed to create local file: %w", err) + } + + defer func() { + // ignore any errors from Close functions, the writers will be + // closed when the program exits + if outFile != nil { + _ = outFile.Close() + } + }() + + // Get Postgres Log Files + podExec, err := util.NewPodExecutor(config) + if err != nil { + return err + } + exec := func(stdin io.Reader, stdout, stderr io.Writer, command ...string, + ) error { + return podExec(namespace, podName, containerName, + stdin, stdout, stderr, command...) + } + + _, err = Executor(exec).copyFile(remotePath, outFile) + if err != nil { + return fmt.Errorf("error during file streaming: %w", err) + } + + // compare file sizes + localFileInfo, err := os.Stat(localPath) + if err != nil { + return fmt.Errorf("failed to get file info: %w", err) + } + if remoteFileSize != localFileInfo.Size() { + return fmt.Errorf("filesize mismatch: remote size is %v and local size is %v", + remoteFileSize, localFileInfo.Size()) + } + + // add localPath to the support export tar + tarPath := fmt.Sprintf("%s/pods/%s/%s", clusterName, podName, remotePath) + err = addFileToTar(tw, localPath, tarPath) + if err != nil { + return fmt.Errorf("error writing to tar: %w", err) + } + + return nil +} + +// addFileToTar copies a local file into a tar archive +func addFileToTar(tw *tar.Writer, localPath, tarPath string) error { + // Open the file to be added to the tar + file, err := os.Open(filepath.Clean(localPath)) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + defer func() { + // ignore any errors from Close functions, the writers will be + // closed when the program exits + if file != nil { + _ = file.Close() + } + }() + + // Get file info to create tar header + fileInfo, err := file.Stat() + if err != nil { + return fmt.Errorf("failed to get file info: %w", err) + } + + // Create tar header + header := &tar.Header{ + Name: tarPath, // Name in the tar archive + Size: fileInfo.Size(), // File size + Mode: int64(fileInfo.Mode()), // File mode + ModTime: fileInfo.ModTime(), // Modification time + } + + // Write header to the tar + err = tw.WriteHeader(header) + if err != nil { + return fmt.Errorf("failed to write tar header: %w", err) + } + + // Stream the file content to the tar + _, err = io.Copy(tw, file) + if err != nil { + return fmt.Errorf("failed to copy file data to tar: %w", err) + } + + return nil +} + +// getRemoteFileSize returns the size of a file within a container so that we can stream its contents +func getRemoteFileSize(config *rest.Config, + namespace string, podName string, containerName string, filePath string) (int64, error) { + + podExec, err := util.NewPodExecutor(config) + if err != nil { + return 0, fmt.Errorf("could not create executor: %w", err) + } + exec := func(stdin io.Reader, stdout, stderr io.Writer, command ...string, + ) error { + return podExec(namespace, podName, containerName, + stdin, stdout, stderr, command...) + } + + // Prepare the command to get the file size using "stat -c %s " + command := fmt.Sprintf("stat -c %s %s", "%s", filePath) + stdout, stderr, err := Executor(exec).bashCommand(command) + if err != nil { + return 0, fmt.Errorf("could not get file size: %w, stderr: %s", err, stderr) + } + + // Parse the file size from stdout + size, err := strconv.ParseInt(strings.TrimSpace(stdout), 10, 64) + if err != nil { + return 0, fmt.Errorf("failed to parse file size: %w", err) + } + + return size, nil +} + +// convertBytes converts a byte size (int64) into a human-readable format. +func convertBytes(bytes int64) string { + const unit = 1024 + if bytes < unit { + return fmt.Sprintf("%d B", bytes) + } + div, exp := int64(unit), 0 + for n := bytes / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp]) +}