diff --git a/internal/cmd/exec.go b/internal/cmd/exec.go index 9688b47..2359e69 100644 --- a/internal/cmd/exec.go +++ b/internal/cmd/exec.go @@ -18,6 +18,7 @@ import ( "bytes" "fmt" "io" + "os" ) // Executor calls commands @@ -110,6 +111,15 @@ func (exec Executor) catFile(filePath string) (string, string, error) { return stdout.String(), stderr.String(), err } +// copyFile takes the full path of a file and a local destination to save the +// file on disk +func (exec Executor) copyFile(source string, destination *os.File) (string, error) { + var stderr bytes.Buffer + command := fmt.Sprintf("cat %s", source) + err := exec(nil, destination, &stderr, "bash", "-ceu", "--", command) + return stderr.String(), err +} + // patronictl takes a patronictl subcommand and returns the output of that command func (exec Executor) patronictl(cmd, output string) (string, string, error) { var stdout, stderr bytes.Buffer diff --git a/internal/cmd/export.go b/internal/cmd/export.go index f9ca1a2..0105352 100644 --- a/internal/cmd/export.go +++ b/internal/cmd/export.go @@ -23,6 +23,8 @@ import ( "io" "os" "os/exec" + "path/filepath" + "strconv" "strings" "text/tabwriter" "time" @@ -449,7 +451,8 @@ Collecting PGO CLI logs... // All Postgres Logs on the Postgres Instances (primary and replicas) if numLogs > 0 { if err == nil { - err = gatherPostgresLogsAndConfigs(ctx, clientset, restConfig, namespace, clusterName, numLogs, tw, cmd) + err = gatherPostgresLogsAndConfigs(ctx, clientset, restConfig, + namespace, clusterName, outputDir, outputFile, numLogs, tw, cmd) } } @@ -955,6 +958,8 @@ func gatherPostgresLogsAndConfigs(ctx context.Context, config *rest.Config, namespace string, clusterName string, + outputDir string, + outputFile string, numLogs int, tw *tar.Writer, cmd *cobra.Command, @@ -1024,30 +1029,57 @@ func gatherPostgresLogsAndConfigs(ctx context.Context, } logFiles := strings.Split(strings.TrimSpace(stdout), "\n") - for _, logFile := range logFiles { - writeDebug(cmd, fmt.Sprintf("LOG FILE: %s\n", logFile)) - var buf bytes.Buffer - stdout, stderr, err := Executor(exec).catFile(logFile) + // localDirectory is created to save data on disk + // e.g. outputDir/crunchy_k8s_support_export_2022-08-08-115726-0400/remotePath + localDirectory := filepath.Join(outputDir, strings.ReplaceAll(outputFile, ".tar.gz", "")) + + // flag to determine whether or not to remove localDirectory after the loop + // When an error happens, this flag will switch to false + // It's nice to have the extra data around when errors have happened + doCleanup := true + + for _, logFile := range logFiles { + // get the file size to stream + fileSize, err := getRemoteFileSize(config, namespace, pod.Name, util.ContainerDatabase, logFile) if err != nil { - if apierrors.IsForbidden(err) { - writeInfo(cmd, err.Error()) - // Continue and output errors for each log file - // Allow the user to see and address all issues at once - continue - } - return err + writeDebug(cmd, fmt.Sprintf("could not get file size for %s: %v\n", logFile, err)) + continue } - buf.Write([]byte(stdout)) - if stderr != "" { - str := fmt.Sprintf("\nError returned: %s\n", stderr) - buf.Write([]byte(str)) + // fileSpecSrc is namespace/podname:path/to/file + // fileSpecDest is the local destination of the file + // These are used to help the user grab the file manually when necessary + // e.g. postgres-operator/hippo-instance1-vp9k-0:pgdata/pg16/log/postgresql-Tue.log + fileSpecSrc := fmt.Sprintf("%s/%s:%s", namespace, pod.Name, logFile) + fileSpecDest := filepath.Join(localDirectory, logFile) + writeInfo(cmd, fmt.Sprintf("\tSize of %-85s %v", fileSpecSrc, convertBytes(fileSize))) + + // Stream the file to disk and write the local file to the tar + err = streamFileFromPod(config, tw, + localDirectory, clusterName, namespace, pod.Name, util.ContainerDatabase, logFile, fileSize) + + if err != nil { + doCleanup = false // prevent the deletion of localDirectory so a user can examine contents + writeInfo(cmd, fmt.Sprintf("\tError streaming file %s: %v", logFile, err)) + writeInfo(cmd, fmt.Sprintf("\tCollect manually with kubectl cp -c %s %s %s", + util.ContainerDatabase, fileSpecSrc, fileSpecDest)) + writeInfo(cmd, fmt.Sprintf("\tRemove %s manually after gathering necessary information", localDirectory)) + continue } - path := clusterName + fmt.Sprintf("/pods/%s/", pod.Name) + logFile - if err := writeTar(tw, buf.Bytes(), path, cmd); err != nil { - return err + } + + // doCleanup is true when there are no errors above. + if doCleanup { + // Remove the local directory created to hold the data + // Errors in removing localDirectory should instruct the user to remove manually. + // This happens often on Windows + err = os.RemoveAll(localDirectory) + if err != nil { + writeInfo(cmd, fmt.Sprintf("\tError removing %s: %v", localDirectory, err)) + writeInfo(cmd, fmt.Sprintf("\tYou may need to remove %s manually", localDirectory)) + continue } } @@ -1800,3 +1832,151 @@ func writeDebug(cmd *cobra.Command, s string) { t := time.Now() cmd.Printf("%s - DEBUG - %s", t.Format(logTimeFormat), s) } + +// streamFileFromPod streams the file from the Kubernetes pod to a local file. +func streamFileFromPod(config *rest.Config, tw *tar.Writer, + localDirectory, clusterName, namespace, podName, containerName, remotePath string, + remoteFileSize int64) error { + + // create localPath to write the streamed data from remotePath + // use the uniqueness of outputFile to avoid overwriting other files + localPath := filepath.Join(localDirectory, remotePath) + if err := os.MkdirAll(filepath.Dir(localPath), 0750); err != nil { + return fmt.Errorf("failed to create path for file: %w", err) + } + outFile, err := os.Create(filepath.Clean(localPath)) + if err != nil { + return fmt.Errorf("failed to create local file: %w", err) + } + + defer func() { + // ignore any errors from Close functions, the writers will be + // closed when the program exits + if outFile != nil { + _ = outFile.Close() + } + }() + + // Get Postgres Log Files + podExec, err := util.NewPodExecutor(config) + if err != nil { + return err + } + exec := func(stdin io.Reader, stdout, stderr io.Writer, command ...string, + ) error { + return podExec(namespace, podName, containerName, + stdin, stdout, stderr, command...) + } + + _, err = Executor(exec).copyFile(remotePath, outFile) + if err != nil { + return fmt.Errorf("error during file streaming: %w", err) + } + + // compare file sizes + localFileInfo, err := os.Stat(localPath) + if err != nil { + return fmt.Errorf("failed to get file info: %w", err) + } + if remoteFileSize != localFileInfo.Size() { + return fmt.Errorf("filesize mismatch: remote size is %v and local size is %v", + remoteFileSize, localFileInfo.Size()) + } + + // add localPath to the support export tar + tarPath := fmt.Sprintf("%s/pods/%s/%s", clusterName, podName, remotePath) + err = addFileToTar(tw, localPath, tarPath) + if err != nil { + return fmt.Errorf("error writing to tar: %w", err) + } + + return nil +} + +// addFileToTar copies a local file into a tar archive +func addFileToTar(tw *tar.Writer, localPath, tarPath string) error { + // Open the file to be added to the tar + file, err := os.Open(filepath.Clean(localPath)) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + defer func() { + // ignore any errors from Close functions, the writers will be + // closed when the program exits + if file != nil { + _ = file.Close() + } + }() + + // Get file info to create tar header + fileInfo, err := file.Stat() + if err != nil { + return fmt.Errorf("failed to get file info: %w", err) + } + + // Create tar header + header := &tar.Header{ + Name: tarPath, // Name in the tar archive + Size: fileInfo.Size(), // File size + Mode: int64(fileInfo.Mode()), // File mode + ModTime: fileInfo.ModTime(), // Modification time + } + + // Write header to the tar + err = tw.WriteHeader(header) + if err != nil { + return fmt.Errorf("failed to write tar header: %w", err) + } + + // Stream the file content to the tar + _, err = io.Copy(tw, file) + if err != nil { + return fmt.Errorf("failed to copy file data to tar: %w", err) + } + + return nil +} + +// getRemoteFileSize returns the size of a file within a container so that we can stream its contents +func getRemoteFileSize(config *rest.Config, + namespace string, podName string, containerName string, filePath string) (int64, error) { + + podExec, err := util.NewPodExecutor(config) + if err != nil { + return 0, fmt.Errorf("could not create executor: %w", err) + } + exec := func(stdin io.Reader, stdout, stderr io.Writer, command ...string, + ) error { + return podExec(namespace, podName, containerName, + stdin, stdout, stderr, command...) + } + + // Prepare the command to get the file size using "stat -c %s " + command := fmt.Sprintf("stat -c %s %s", "%s", filePath) + stdout, stderr, err := Executor(exec).bashCommand(command) + if err != nil { + return 0, fmt.Errorf("could not get file size: %w, stderr: %s", err, stderr) + } + + // Parse the file size from stdout + size, err := strconv.ParseInt(strings.TrimSpace(stdout), 10, 64) + if err != nil { + return 0, fmt.Errorf("failed to parse file size: %w", err) + } + + return size, nil +} + +// convertBytes converts a byte size (int64) into a human-readable format. +func convertBytes(bytes int64) string { + const unit = 1024 + if bytes < unit { + return fmt.Sprintf("%d B", bytes) + } + div, exp := int64(unit), 0 + for n := bytes / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp]) +}