Skip to content

Commit

Permalink
Large postgres logs (#105)
Browse files Browse the repository at this point in the history
Support for large Postgres files

Large Postgres files may pose a challenge for some systems. This change stores the file on disk, compares the filesize with the remote, and adds the local file to the tar. It has error handling to allow a support export to be generated when there are issues with gathering PG logs.

* fixes for lint errors

* Remove the cat logic for gathering PG logs. Only do streaming.

* Create a unique sudirectory in outputDir (matching the unique timestamp) so that local files are logically separate from other files. Adding messaging and error handling so a user understands.

* remove extra call to getRemoteFileSize

* refactored getRemoteFileSize to use bashCommand to get the filesize

* added copyFile function to cat a file contents to a local file

* use the copyFile function rather calling manually

* added example comment for fileSpecSrc

* added comment to copyFile

* rename ConvertBytes to convertBytes

---------

Co-authored-by: Philip Hurst <[email protected]>
  • Loading branch information
philrhurst and Philip Hurst authored Oct 3, 2024
1 parent 3a83853 commit edbf76e
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 19 deletions.
10 changes: 10 additions & 0 deletions internal/cmd/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"fmt"
"io"
"os"
)

// Executor calls commands
Expand Down Expand Up @@ -110,6 +111,15 @@ func (exec Executor) catFile(filePath string) (string, string, error) {
return stdout.String(), stderr.String(), err
}

// copyFile takes the full path of a file and a local destination to save the
// file on disk
func (exec Executor) copyFile(source string, destination *os.File) (string, error) {
var stderr bytes.Buffer
command := fmt.Sprintf("cat %s", source)
err := exec(nil, destination, &stderr, "bash", "-ceu", "--", command)
return stderr.String(), err
}

// patronictl takes a patronictl subcommand and returns the output of that command
func (exec Executor) patronictl(cmd, output string) (string, string, error) {
var stdout, stderr bytes.Buffer
Expand Down
218 changes: 199 additions & 19 deletions internal/cmd/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"io"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"text/tabwriter"
"time"
Expand Down Expand Up @@ -449,7 +451,8 @@ Collecting PGO CLI logs...
// All Postgres Logs on the Postgres Instances (primary and replicas)
if numLogs > 0 {
if err == nil {
err = gatherPostgresLogsAndConfigs(ctx, clientset, restConfig, namespace, clusterName, numLogs, tw, cmd)
err = gatherPostgresLogsAndConfigs(ctx, clientset, restConfig,
namespace, clusterName, outputDir, outputFile, numLogs, tw, cmd)
}
}

Expand Down Expand Up @@ -955,6 +958,8 @@ func gatherPostgresLogsAndConfigs(ctx context.Context,
config *rest.Config,
namespace string,
clusterName string,
outputDir string,
outputFile string,
numLogs int,
tw *tar.Writer,
cmd *cobra.Command,
Expand Down Expand Up @@ -1024,30 +1029,57 @@ func gatherPostgresLogsAndConfigs(ctx context.Context,
}

logFiles := strings.Split(strings.TrimSpace(stdout), "\n")
for _, logFile := range logFiles {
writeDebug(cmd, fmt.Sprintf("LOG FILE: %s\n", logFile))
var buf bytes.Buffer

stdout, stderr, err := Executor(exec).catFile(logFile)
// localDirectory is created to save data on disk
// e.g. outputDir/crunchy_k8s_support_export_2022-08-08-115726-0400/remotePath
localDirectory := filepath.Join(outputDir, strings.ReplaceAll(outputFile, ".tar.gz", ""))

// flag to determine whether or not to remove localDirectory after the loop
// When an error happens, this flag will switch to false
// It's nice to have the extra data around when errors have happened
doCleanup := true

for _, logFile := range logFiles {
// get the file size to stream
fileSize, err := getRemoteFileSize(config, namespace, pod.Name, util.ContainerDatabase, logFile)
if err != nil {
if apierrors.IsForbidden(err) {
writeInfo(cmd, err.Error())
// Continue and output errors for each log file
// Allow the user to see and address all issues at once
continue
}
return err
writeDebug(cmd, fmt.Sprintf("could not get file size for %s: %v\n", logFile, err))
continue
}

buf.Write([]byte(stdout))
if stderr != "" {
str := fmt.Sprintf("\nError returned: %s\n", stderr)
buf.Write([]byte(str))
// fileSpecSrc is namespace/podname:path/to/file
// fileSpecDest is the local destination of the file
// These are used to help the user grab the file manually when necessary
// e.g. postgres-operator/hippo-instance1-vp9k-0:pgdata/pg16/log/postgresql-Tue.log
fileSpecSrc := fmt.Sprintf("%s/%s:%s", namespace, pod.Name, logFile)
fileSpecDest := filepath.Join(localDirectory, logFile)
writeInfo(cmd, fmt.Sprintf("\tSize of %-85s %v", fileSpecSrc, convertBytes(fileSize)))

// Stream the file to disk and write the local file to the tar
err = streamFileFromPod(config, tw,
localDirectory, clusterName, namespace, pod.Name, util.ContainerDatabase, logFile, fileSize)

if err != nil {
doCleanup = false // prevent the deletion of localDirectory so a user can examine contents
writeInfo(cmd, fmt.Sprintf("\tError streaming file %s: %v", logFile, err))
writeInfo(cmd, fmt.Sprintf("\tCollect manually with kubectl cp -c %s %s %s",
util.ContainerDatabase, fileSpecSrc, fileSpecDest))
writeInfo(cmd, fmt.Sprintf("\tRemove %s manually after gathering necessary information", localDirectory))
continue
}

path := clusterName + fmt.Sprintf("/pods/%s/", pod.Name) + logFile
if err := writeTar(tw, buf.Bytes(), path, cmd); err != nil {
return err
}

// doCleanup is true when there are no errors above.
if doCleanup {
// Remove the local directory created to hold the data
// Errors in removing localDirectory should instruct the user to remove manually.
// This happens often on Windows
err = os.RemoveAll(localDirectory)
if err != nil {
writeInfo(cmd, fmt.Sprintf("\tError removing %s: %v", localDirectory, err))
writeInfo(cmd, fmt.Sprintf("\tYou may need to remove %s manually", localDirectory))
continue
}
}

Expand Down Expand Up @@ -1800,3 +1832,151 @@ func writeDebug(cmd *cobra.Command, s string) {
t := time.Now()
cmd.Printf("%s - DEBUG - %s", t.Format(logTimeFormat), s)
}

// streamFileFromPod streams the file from the Kubernetes pod to a local file.
func streamFileFromPod(config *rest.Config, tw *tar.Writer,
localDirectory, clusterName, namespace, podName, containerName, remotePath string,
remoteFileSize int64) error {

// create localPath to write the streamed data from remotePath
// use the uniqueness of outputFile to avoid overwriting other files
localPath := filepath.Join(localDirectory, remotePath)
if err := os.MkdirAll(filepath.Dir(localPath), 0750); err != nil {
return fmt.Errorf("failed to create path for file: %w", err)
}
outFile, err := os.Create(filepath.Clean(localPath))
if err != nil {
return fmt.Errorf("failed to create local file: %w", err)
}

defer func() {
// ignore any errors from Close functions, the writers will be
// closed when the program exits
if outFile != nil {
_ = outFile.Close()
}
}()

// Get Postgres Log Files
podExec, err := util.NewPodExecutor(config)
if err != nil {
return err
}
exec := func(stdin io.Reader, stdout, stderr io.Writer, command ...string,
) error {
return podExec(namespace, podName, containerName,
stdin, stdout, stderr, command...)
}

_, err = Executor(exec).copyFile(remotePath, outFile)
if err != nil {
return fmt.Errorf("error during file streaming: %w", err)
}

// compare file sizes
localFileInfo, err := os.Stat(localPath)
if err != nil {
return fmt.Errorf("failed to get file info: %w", err)
}
if remoteFileSize != localFileInfo.Size() {
return fmt.Errorf("filesize mismatch: remote size is %v and local size is %v",
remoteFileSize, localFileInfo.Size())
}

// add localPath to the support export tar
tarPath := fmt.Sprintf("%s/pods/%s/%s", clusterName, podName, remotePath)
err = addFileToTar(tw, localPath, tarPath)
if err != nil {
return fmt.Errorf("error writing to tar: %w", err)
}

return nil
}

// addFileToTar copies a local file into a tar archive
func addFileToTar(tw *tar.Writer, localPath, tarPath string) error {
// Open the file to be added to the tar
file, err := os.Open(filepath.Clean(localPath))
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer func() {
// ignore any errors from Close functions, the writers will be
// closed when the program exits
if file != nil {
_ = file.Close()
}
}()

// Get file info to create tar header
fileInfo, err := file.Stat()
if err != nil {
return fmt.Errorf("failed to get file info: %w", err)
}

// Create tar header
header := &tar.Header{
Name: tarPath, // Name in the tar archive
Size: fileInfo.Size(), // File size
Mode: int64(fileInfo.Mode()), // File mode
ModTime: fileInfo.ModTime(), // Modification time
}

// Write header to the tar
err = tw.WriteHeader(header)
if err != nil {
return fmt.Errorf("failed to write tar header: %w", err)
}

// Stream the file content to the tar
_, err = io.Copy(tw, file)
if err != nil {
return fmt.Errorf("failed to copy file data to tar: %w", err)
}

return nil
}

// getRemoteFileSize returns the size of a file within a container so that we can stream its contents
func getRemoteFileSize(config *rest.Config,
namespace string, podName string, containerName string, filePath string) (int64, error) {

podExec, err := util.NewPodExecutor(config)
if err != nil {
return 0, fmt.Errorf("could not create executor: %w", err)
}
exec := func(stdin io.Reader, stdout, stderr io.Writer, command ...string,
) error {
return podExec(namespace, podName, containerName,
stdin, stdout, stderr, command...)
}

// Prepare the command to get the file size using "stat -c %s <file>"
command := fmt.Sprintf("stat -c %s %s", "%s", filePath)
stdout, stderr, err := Executor(exec).bashCommand(command)
if err != nil {
return 0, fmt.Errorf("could not get file size: %w, stderr: %s", err, stderr)
}

// Parse the file size from stdout
size, err := strconv.ParseInt(strings.TrimSpace(stdout), 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse file size: %w", err)
}

return size, nil
}

// convertBytes converts a byte size (int64) into a human-readable format.
func convertBytes(bytes int64) string {
const unit = 1024
if bytes < unit {
return fmt.Sprintf("%d B", bytes)
}
div, exp := int64(unit), 0
for n := bytes / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])
}

0 comments on commit edbf76e

Please sign in to comment.