Skip to content

Commit

Permalink
break up command into functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Consolethinks committed Sep 5, 2024
1 parent 2552445 commit ce93248
Showing 1 changed file with 111 additions and 88 deletions.
199 changes: 111 additions & 88 deletions cmd/commands/globusCheckTransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"time"

"github.com/SwissOpenEM/globus"
"github.com/fatih/color"
"github.com/paulscherrerinstitute/scicat/cmd/cliutils"
"github.com/paulscherrerinstitute/scicat/datasetIngestor"
Expand Down Expand Up @@ -144,98 +145,15 @@ For further help see "` + MANUAL + `"`,
// go through each transfer task, and execute the requested operations
var archivableDatasetList []string
for _, taskId := range args {
task, err := globusClient.TransferGetTaskByID(taskId)
if err != nil {
log.Printf("Transfer task with ID \"%s\" returned error: %v\n", taskId, err)
continue
}
fmt.Printf("Task status: \n=====\n%v\n=====\n", task)

// if marking as archivable is requested and the transfer has succeeded
if markArchivable && task.Status == "SUCCEEDED" {
if task.SourceBasePath == nil {
log.Printf("Can't get source base path for task \"%s\". It will not be marked as archivable, but can probably be archived.\n", taskId)
continue
}

// get source and dest folders
sourceFolder := *task.SourceBasePath
sourceFolder = strings.TrimPrefix(sourceFolder, gConfig.SourcePrefixPath)
sourceFolder = strings.TrimSuffix(sourceFolder, "/")
var destFolder string
if !skipDestPathCheck {
if task.DestinationBasePath == nil {
log.Printf("Can't get destination base path for task \"%s\". It will not be marked as archivable, but can probably be archived.\n", taskId)
continue
}
destFolder = *task.DestinationBasePath
}

list, err := datasetIngestor.TestForExistingSourceFolder([]string{sourceFolder}, client, APIServer, user["accessToken"])

// error handling and exceptions
if err != nil {
log.Printf("WARNING - an error has occurred when querying the sourcefolder \"%s\" of task id \"%s\": %v\n", sourceFolder, taskId, err)
log.Printf("Can't set %s task's dataset to archivable.\n", taskId)
continue
}
if len(list) <= 0 {
log.Printf("WARNING - empty dataset list returned for the sourcefolder \"%s\" of task id \"%s\": %v\n", sourceFolder, taskId, err)
log.Printf("Can't set %s task's dataset to archivable.\n", taskId)
continue
}
if dryRun {
log.Println("list of found datasets:")
for _, result := range list {
fmt.Printf(" - %s\n", result.Pid)
}
log.Println("since dry-run is set, the command will not attempt to mark the above datasets as archivable, or try to archive them")
continue
}

for _, result := range list {
if !skipDestPathCheck {
separatedPid := strings.Split(result.Pid, "/")
if len(separatedPid) != 2 {
log.Printf("\"%s\" dataset has irregular PID. Cannot check destFolder with it. Skipping...\n", result.Pid)
continue
}
shortPid := separatedPid[1]
if !strings.Contains(destFolder, shortPid) {
log.Printf("\"%s\" dataset's PID does not appear in the destination folder (\"%s\"). Cannot mark it as archivable. Skipping...\n", result.Pid, destFolder)
continue
}
}
log.Printf("%s dataset is being marked as archivable...\n", result.Pid)
err := datasetIngestor.MarkFilesReady(client, APIServer, result.Pid, user)
if err != nil {
log.Printf("WARNING - error occurred while trying to mark files ready for dataset with PID \"%s\": %v\n", result.Pid, err)
log.Printf("%s dataset was (likely) not marked archivable.\n", result.Pid)
continue
}
log.Printf("%s dataset was successfully marked as archivable.\n", result.Pid)
archivableDatasetList = append(archivableDatasetList, result.Pid)
}
}

// if marking as archivable is requested but the transfer has *not* succeeded
if markArchivable && task.Status != "SUCCEEDED" {
log.Printf("%s task's status is %s, the corresponding dataset can't be marked as archivable.\n", taskId, task.Status)
}
archivableDatasetList = append(
archivableDatasetList,
globusCheckTransferHandleTransferTask(globusClient, taskId, markArchivable, gConfig, skipDestPathCheck, dryRun, client, APIServer, user)...,
)
}

// === create archive job ===
if autoarchiveFlag {
log.Printf("Submitting Archive Job for archivable datasets.\n")
// TODO: change param type from pointer to regular as it is unnecessary
// for it to be passed as pointer
jobId, err := datasetUtils.CreateArchivalJob(client, APIServer, user, archivableDatasetList, &tapecopies)
if err != nil {
color.Set(color.FgRed)
log.Printf("Could not create the archival job for the ingested datasets: %s", err.Error())
color.Unset()
}
log.Println("Submitted job:", jobId)
globusCheckTransferCreateArchiveJob(client, APIServer, user, archivableDatasetList, tapecopies)
}
},
}
Expand All @@ -258,3 +176,108 @@ func init() {
globusCheckTransfer.MarkFlagsMutuallyExclusive("dry-run", "autoarchive")
globusCheckTransfer.MarkFlagsMutuallyExclusive("dry-run", "tapecopies")
}

func globusCheckTransferCreateArchiveJob(client *http.Client, APIServer string, user map[string]string, archivableDatasetList []string, tapecopies int) {
log.Printf("Submitting Archive Job for archivable datasets.\n")
// TODO: change param type from pointer to regular as it is unnecessary
// for it to be passed as pointer
jobId, err := datasetUtils.CreateArchivalJob(client, APIServer, user, archivableDatasetList, &tapecopies)
if err != nil {
color.Set(color.FgRed)
log.Printf("Could not create the archival job for the ingested datasets: %s", err.Error())
color.Unset()
}
log.Println("Submitted job:", jobId)
}

func globusCheckTransferHandleTransferTask(
globusClient globus.GlobusClient,
taskId string, markArchivable bool,
gConfig cliutils.GlobusConfig,
skipDestPathCheck bool,
dryRun bool,
client *http.Client,
APIServer string,
user map[string]string,
) (archivableDatasetList []string) {
task, err := globusClient.TransferGetTaskByID(taskId)
if err != nil {
log.Printf("Transfer task with ID \"%s\" returned error: %v\n", taskId, err)
return
}
fmt.Printf("Task status: \n=====\n%v\n=====\n", task)

// if marking as archivable is requested and the transfer has succeeded
if markArchivable && task.Status == "SUCCEEDED" {
if task.SourceBasePath == nil {
log.Printf("Can't get source base path for task \"%s\". It will not be marked as archivable, but can probably be archived.\n", taskId)
return []string{}
}

// get source and dest folders
sourceFolder := *task.SourceBasePath
sourceFolder = strings.TrimPrefix(sourceFolder, gConfig.SourcePrefixPath)
sourceFolder = strings.TrimSuffix(sourceFolder, "/")
var destFolder string
if !skipDestPathCheck {
if task.DestinationBasePath == nil {
log.Printf("Can't get destination base path for task \"%s\". It will not be marked as archivable, but can probably be archived.\n", taskId)
return []string{}
}
destFolder = *task.DestinationBasePath
}

list, err := datasetIngestor.TestForExistingSourceFolder([]string{sourceFolder}, client, APIServer, user["accessToken"])

// error handling and exceptions
if err != nil {
log.Printf("WARNING - an error has occurred when querying the sourcefolder \"%s\" of task id \"%s\": %v\n", sourceFolder, taskId, err)
log.Printf("Can't set %s task's dataset to archivable.\n", taskId)
return []string{}
}
if len(list) <= 0 {
log.Printf("WARNING - empty dataset list returned for the sourcefolder \"%s\" of task id \"%s\": %v\n", sourceFolder, taskId, err)
log.Printf("Can't set %s task's dataset to archivable.\n", taskId)
return []string{}
}
if dryRun {
log.Println("list of found datasets:")
for _, result := range list {
fmt.Printf(" - %s\n", result.Pid)
}
log.Println("since dry-run is set, the command will not attempt to mark the above datasets as archivable, or try to archive them")
return []string{}
}

for _, result := range list {
if !skipDestPathCheck {
separatedPid := strings.Split(result.Pid, "/")
if len(separatedPid) != 2 {
log.Printf("\"%s\" dataset has irregular PID. Cannot check destFolder with it. Skipping...\n", result.Pid)
continue
}
shortPid := separatedPid[1]
if !strings.Contains(destFolder, shortPid) {
log.Printf("\"%s\" dataset's PID does not appear in the destination folder (\"%s\"). Cannot mark it as archivable. Skipping...\n", result.Pid, destFolder)
continue
}
}
log.Printf("%s dataset is being marked as archivable...\n", result.Pid)
err := datasetIngestor.MarkFilesReady(client, APIServer, result.Pid, user)
if err != nil {
log.Printf("WARNING - error occurred while trying to mark files ready for dataset with PID \"%s\": %v\n", result.Pid, err)
log.Printf("%s dataset was (likely) not marked archivable.\n", result.Pid)
continue
}
log.Printf("%s dataset was successfully marked as archivable.\n", result.Pid)
archivableDatasetList = append(archivableDatasetList, result.Pid)
}
}

// if marking as archivable is requested but the transfer has *not* succeeded
if markArchivable && task.Status != "SUCCEEDED" {
log.Printf("%s task's status is %s, the corresponding dataset can't be marked as archivable.\n", taskId, task.Status)
}

return archivableDatasetList
}

0 comments on commit ce93248

Please sign in to comment.