Skip to content

Commit

Permalink
Small improvements in staging in kubeconsumer
Browse files Browse the repository at this point in the history
1. If a dest file already exists, don't overwrite it, but
instead print a warning.
2. Start monitoring the log file earlier to avoid error
messages about task not existing
3. Add a message to the log saying
when sparkles is downloading files
to make it obvious what we're waiting
for.
  • Loading branch information
pgm committed Oct 30, 2023
1 parent d5cb914 commit 8654678
Showing 1 changed file with 24 additions and 9 deletions.
33 changes: 24 additions & 9 deletions go/src/github.com/broadinstitute/kubequeconsume/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ func downloadAll(ioc IOClient, workdir string, downloads []*TaskDownload, cacheD
srcURL := dl.SrcURL
destination := path.Join(workdir, dl.Dst)

if _, err := os.Stat(destination); err == nil {
log.Printf("Skipping download of %s -> %s because file already exists.", srcURL, destination)
continue
}

//parentDir := strings.ToLower(path.Base(path.Dir(srcURL)))
if dl.IsCASKey {
casKey := path.Base(srcURL)
Expand All @@ -120,6 +125,7 @@ func downloadAll(ioc IOClient, workdir string, downloads []*TaskDownload, cacheD
cacheDest := path.Join(cacheDir, casKey)

if _, err := os.Stat(cacheDest); os.IsNotExist(err) {
log.Printf("Downloading %s -> %s", srcURL, destination)
err = ioc.Download(srcURL, cacheDest)
if err != nil {
return err, downloaded
Expand Down Expand Up @@ -160,6 +166,7 @@ func downloadAll(ioc IOClient, workdir string, downloads []*TaskDownload, cacheD
}

} else {
log.Printf("Downloading %s -> %s", srcURL, destination)
err := ioc.Download(srcURL, destination)
if err != nil {
return err, downloaded
Expand Down Expand Up @@ -370,11 +377,28 @@ func executeTaskInDir(ioc IOClient, workdir string, taskId string, spec *TaskSpe
stdoutPath := path.Join(workdir, "stdout.txt")
execLifecycleScript("PreDownloadScript", workdir, spec.PreDownloadScript)

stdout, err := os.OpenFile(stdoutPath, os.O_WRONLY|os.O_CREATE, 0766)
if err != nil {
return "", err
}

if monitor != nil {
monitor.StartWatchingLog(taskId, stdoutPath)
}

if len(spec.Downloads) > 0 {
stdout.WriteString(fmt.Sprintf("sparkles: Downloading %d files...\n", len(spec.Downloads)))
}

err, downloaded := downloadAll(ioc, workdir, spec.Downloads, cachedir)
if err != nil {
return "", err
}

if len(spec.Downloads) > 0 {
stdout.WriteString(fmt.Sprintf("sparkles: download complete.\n"))
}

downloadedInitialMTimes := getModificationTimes(downloaded)

execLifecycleScript("PostDownloadScript", workdir, spec.PostDownloadScript)
Expand All @@ -387,15 +411,6 @@ func executeTaskInDir(ioc IOClient, workdir string, taskId string, spec *TaskSpe
panic("bad commandWorkingDir")
}

stdout, err := os.OpenFile(stdoutPath, os.O_WRONLY|os.O_CREATE, 0766)
if err != nil {
return "", err
}

if monitor != nil {
monitor.StartWatchingLog(taskId, stdoutPath)
}

cwdDir := path.Join(workdir, commandWorkingDir)
log.Printf("Executing (working dir: %s, output written to: %s): %s", cwdDir, stdoutPath, spec.Command)
resourceUsage, retcode, err := execCommand(spec.Command, cwdDir, stdout)
Expand Down

0 comments on commit 8654678

Please sign in to comment.