diff --git a/cmd/put.go b/cmd/put.go index 9f279f5..96d943c 100644 --- a/cmd/put.go +++ b/cmd/put.go @@ -52,6 +52,8 @@ const ( putMetaParts = 2 putSet = "manual" + numIRODSConnections = 2 + // minMBperSecondUploadSpeed is the slowest MB/s we think an upload should // take; if it drops below this and is still uploading, we'll report to the // server the upload might be stuck. @@ -184,6 +186,11 @@ func handleServerMode(started time.Time) { os.Exit(0) } + err = client.MakingIRODSConnections(numIRODSConnections) + if err != nil { + die(err.Error()) + } + uploadStarts, uploadResults, skipResults, dfunc := handlePut(client, requests) err = client.SendPutResultsToServer(uploadStarts, uploadResults, skipResults, @@ -191,6 +198,11 @@ func handleServerMode(started time.Time) { dfunc() + errm := client.ClosedIRODSConnections() + if errm != nil { + warn(errm.Error()) + } + if err != nil { warn("%s", err) diff --git a/cmd/server.go b/cmd/server.go index 22a56cd..1f09d0a 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -152,13 +152,18 @@ database that you've made, to investigate. warn("ldap options not supplied, will assume all user passwords are correct!") } + logWriter := setServerLogger(serverLogPath) token := os.Getenv("IBACKUP_SLACK_TOKEN") channel := os.Getenv("IBACKUP_SLACK_CHANNEL") var slacker set.Slacker if token != "" && channel != "" { - slacker = slack.New(slack.Config{Token: token, Channel: channel}) + slacker = slack.New(slack.Config{ + Token: token, + Channel: channel, + ErrorLogger: logWriter, + }) } else { if serverStillRunningMsgFreq != "" { die("--still_running requires slack variables") @@ -179,8 +184,6 @@ database that you've made, to investigate. } } - logWriter := setServerLogger(serverLogPath) - conf := server.Config{ HTTPLogger: logWriter, Slacker: slacker, diff --git a/cmd/status.go b/cmd/status.go index 55f032b..0c0a991 100644 --- a/cmd/status.go +++ b/cmd/status.go @@ -220,8 +220,8 @@ func status(client *server.Client, sf statusFilterer, user, name string, details func displayQueueStatus(qs *server.QStatus) { info("Global put queue status: %d queued; %d reserved to be worked on; %d failed", qs.Total, qs.Reserved, qs.Failed) - info("Global put client status (/%d): %d creating collections; %d currently uploading", - numPutClients, qs.CreatingCollections, qs.Uploading) + info("Global put client status (/%d): %d iRODS connections; %d creating collections; %d currently uploading", + numPutClients, qs.IRODSConnections, qs.CreatingCollections, qs.Uploading) if qs.Stuck != nil { if gasClientCLI(serverURL, serverCert).CanReadServerToken() { @@ -336,10 +336,10 @@ func displaySet(s *set.Set, showRequesters bool) { //nolint:funlen,gocyclo } cliPrint("Discovery: %s\n", s.Discovered()) - cliPrint("Num files: %s; Symlinks: %d; Hardlinks: %d; Size files: %s\n", - s.Count(), s.Symlinks, s.Hardlinks, s.Size()) - cliPrint("Uploaded: %d; Failed: %d; Missing: %d; Abnormal: %d\n", - s.Uploaded, s.Failed, s.Missing, s.Abnormal) + cliPrint("Num files: %s; Symlinks: %d; Hardlinks: %d; Size (total/recently uploaded): %s / %s\n", + s.Count(), s.Symlinks, s.Hardlinks, s.Size(), s.UploadedSize()) + cliPrint("Uploaded: %d; Replaced: %d; Skipped: %d; Failed: %d; Missing: %d; Abnormal: %d\n", + s.Uploaded, s.Replaced, s.Skipped, s.Failed, s.Missing, s.Abnormal) switch s.Status { case set.Complete: @@ -406,7 +406,7 @@ func determineETADetailsFromSize(s *set.Set) (basedOn, unit string, total, done remaining, speed float64, timeUnit time.Duration) { basedOn = "last completed size" total = s.LastCompletedSize - done = s.SizeFiles + done = s.SizeTotal remaining = bytesToMB(total - done) if done == 0 { diff --git a/main_test.go b/main_test.go index dd7ebf4..f60a805 100644 --- a/main_test.go +++ b/main_test.go @@ -54,7 +54,7 @@ const app = "ibackup" const userPerms = 0700 const noBackupSets = `Global put queue status: 0 queued; 0 reserved to be worked on; 0 failed -Global put client status (/10): 0 creating collections; 0 currently uploading +Global put client status (/10): 0 iRODS connections; 0 creating collections; 0 currently uploading no backup sets` var errTwoBackupsNotSeen = errors.New("2 backups were not seen") @@ -107,6 +107,8 @@ func (s *TestServer) prepareFilePaths(dir string) { "HOME=" + home, "IRODS_ENVIRONMENT_FILE=" + os.Getenv("IRODS_ENVIRONMENT_FILE"), "GEM_HOME=" + os.Getenv("GEM_HOME"), + "IBACKUP_SLACK_TOKEN=" + os.Getenv("IBACKUP_SLACK_TOKEN"), + "IBACKUP_SLACK_CHANNEL=" + os.Getenv("IBACKUP_SLACK_CHANNEL"), } } @@ -381,12 +383,7 @@ func TestMain(m *testing.M) { } } - remotePath := os.Getenv("IBACKUP_TEST_COLLECTION") - if remotePath == "" { - return - } - - exec.Command("irm", "-r", remotePath).Run() //nolint:errcheck + resetIRODS() } func buildSelf() func() { @@ -399,6 +396,15 @@ func buildSelf() func() { return func() { os.Remove(app) } } +func resetIRODS() { + remotePath := os.Getenv("IBACKUP_TEST_COLLECTION") + if remotePath == "" { + return + } + + exec.Command("irm", "-r", remotePath).Run() //nolint:errcheck +} + func failMainTest(err string) { fmt.Println(err) //nolint:forbidigo } @@ -428,15 +434,15 @@ func TestStatus(t *testing.T) { Convey("Status tells you where input directories would get uploaded to", func() { s.confirmOutput(t, []string{"status"}, 0, `Global put queue status: 0 queued; 0 reserved to be worked on; 0 failed -Global put client status (/10): 0 creating collections; 0 currently uploading +Global put client status (/10): 0 iRODS connections; 0 creating collections; 0 currently uploading Name: testAdd Transformer: `+transformer+` Monitored: false; Archive: false Status: complete Discovery: -Num files: 0; Symlinks: 0; Hardlinks: 0; Size files: 0 B -Uploaded: 0; Failed: 0; Missing: 0; Abnormal: 0 +Num files: 0; Symlinks: 0; Hardlinks: 0; Size (total/recently uploaded): 0 B / 0 B +Uploaded: 0; Replaced: 0; Skipped: 0; Failed: 0; Missing: 0; Abnormal: 0 Completed in: 0s Directories: `+localDir+" => "+remoteDir) @@ -461,15 +467,15 @@ Directories: Convey("Status tells you an example of where input files would get uploaded to", func() { s.confirmOutput(t, []string{"status", "--name", "testAddFiles"}, 0, `Global put queue status: 2 queued; 0 reserved to be worked on; 0 failed -Global put client status (/10): 0 creating collections; 0 currently uploading +Global put client status (/10): 0 iRODS connections; 0 creating collections; 0 currently uploading Name: testAddFiles Transformer: prefix=`+dir+`:/remote Monitored: false; Archive: false Status: complete Discovery: -Num files: 2; Symlinks: 0; Hardlinks: 0; Size files: 0 B -Uploaded: 0; Failed: 0; Missing: 2; Abnormal: 0 +Num files: 2; Symlinks: 0; Hardlinks: 0; Size (total/recently uploaded): 0 B / 0 B +Uploaded: 0; Replaced: 0; Skipped: 0; Failed: 0; Missing: 2; Abnormal: 0 Completed in: 0s Example File: `+dir+`/path/to/other/file => /remote/path/to/other/file`) }) @@ -480,15 +486,15 @@ Example File: `+dir+`/path/to/other/file => /remote/path/to/other/file`) s.addSetForTesting(t, "badHumgen", "humgen", localDir) expected := `Global put queue status: 0 queued; 0 reserved to be worked on; 0 failed -Global put client status (/10): 0 creating collections; 0 currently uploading +Global put client status (/10): 0 iRODS connections; 0 creating collections; 0 currently uploading Name: badHumgen Transformer: humgen Monitored: false; Archive: false Status: complete Discovery: -Num files: 0; Symlinks: 0; Hardlinks: 0; Size files: 0 B -Uploaded: 0; Failed: 0; Missing: 0; Abnormal: 0 +Num files: 0; Symlinks: 0; Hardlinks: 0; Size (total/recently uploaded): 0 B / 0 B +Uploaded: 0; Replaced: 0; Skipped: 0; Failed: 0; Missing: 0; Abnormal: 0 Completed in: 0s Directories: your transformer didn't work: not a valid humgen lustre path [` + localDir + `/file.txt] @@ -514,15 +520,15 @@ your transformer didn't work: not a valid humgen lustre path [` + localDir + `/f s.confirmOutput(t, []string{"status", "-n", "oddPrefix"}, 0, `Global put queue status: 0 queued; 0 reserved to be worked on; 0 failed -Global put client status (/10): 0 creating collections; 0 currently uploading +Global put client status (/10): 0 iRODS connections; 0 creating collections; 0 currently uploading Name: oddPrefix Transformer: `+transformer+` Monitored: false; Archive: false Status: complete Discovery: -Num files: 0; Symlinks: 0; Hardlinks: 0; Size files: 0 B -Uploaded: 0; Failed: 0; Missing: 0; Abnormal: 0 +Num files: 0; Symlinks: 0; Hardlinks: 0; Size (total/recently uploaded): 0 B / 0 B +Uploaded: 0; Replaced: 0; Skipped: 0; Failed: 0; Missing: 0; Abnormal: 0 Completed in: 0s Directories: `+localDir+toRemote+localDir) @@ -546,7 +552,7 @@ Directories: s.confirmOutput(t, []string{"status", "-n", "badPerms"}, 0, `Global put queue status: 0 queued; 0 reserved to be worked on; 0 failed -Global put client status (/10): 0 creating collections; 0 currently uploading +Global put client status (/10): 0 iRODS connections; 0 creating collections; 0 currently uploading Name: badPerms Transformer: `+transformer+` @@ -554,8 +560,8 @@ Monitored: false; Archive: false Status: complete Warning: open `+badPermDir+`: permission denied Discovery: -Num files: 0; Symlinks: 0; Hardlinks: 0; Size files: 0 B -Uploaded: 0; Failed: 0; Missing: 0; Abnormal: 0 +Num files: 0; Symlinks: 0; Hardlinks: 0; Size (total/recently uploaded): 0 B / 0 B +Uploaded: 0; Replaced: 0; Skipped: 0; Failed: 0; Missing: 0; Abnormal: 0 Completed in: 0s Directories: `+localDir+" => "+remote) @@ -575,15 +581,15 @@ Directories: s.confirmOutput(t, []string{"status", "-n", "humgenSet"}, 0, `Global put queue status: 1 queued; 0 reserved to be worked on; 0 failed -Global put client status (/10): 0 creating collections; 0 currently uploading +Global put client status (/10): 0 iRODS connections; 0 creating collections; 0 currently uploading Name: humgenSet Transformer: humgen Monitored: false; Archive: false Status: pending upload Discovery: -Num files: 1; Symlinks: 0; Hardlinks: 0; Size files: 0 B (and counting) -Uploaded: 0; Failed: 0; Missing: 0; Abnormal: 0 +Num files: 1; Symlinks: 0; Hardlinks: 0; Size (total/recently uploaded): 0 B (and counting) / 0 B +Uploaded: 0; Replaced: 0; Skipped: 0; Failed: 0; Missing: 0; Abnormal: 0 Example File: `+humgenFile+" => /humgen/teams/hgi/scratch125/mercury/ibackup/file_for_testsuite.do_not_delete") }) @@ -601,15 +607,15 @@ Example File: `+humgenFile+" => /humgen/teams/hgi/scratch125/mercury/ibackup/fil s.confirmOutput(t, []string{"status", "-n", "gengenSet"}, 0, `Global put queue status: 1 queued; 0 reserved to be worked on; 0 failed -Global put client status (/10): 0 creating collections; 0 currently uploading +Global put client status (/10): 0 iRODS connections; 0 creating collections; 0 currently uploading Name: gengenSet Transformer: gengen Monitored: false; Archive: false Status: pending upload Discovery: -Num files: 1; Symlinks: 0; Hardlinks: 0; Size files: 0 B (and counting) -Uploaded: 0; Failed: 0; Missing: 0; Abnormal: 0 +Num files: 1; Symlinks: 0; Hardlinks: 0; Size (total/recently uploaded): 0 B (and counting) / 0 B +Uploaded: 0; Replaced: 0; Skipped: 0; Failed: 0; Missing: 0; Abnormal: 0 Example File: `+gengenFile+" => /humgen/gengen/teams/hgi/scratch126/mercury/ibackup/file_for_testsuite.do_not_delete") }) @@ -638,15 +644,15 @@ Example File: `+gengenFile+" => /humgen/gengen/teams/hgi/scratch126/mercury/ibac s.confirmOutput(t, []string{"status", "--name", "testLinks"}, 0, `Global put queue status: 4 queued; 0 reserved to be worked on; 0 failed -Global put client status (/10): 0 creating collections; 0 currently uploading +Global put client status (/10): 0 iRODS connections; 0 creating collections; 0 currently uploading Name: testLinks Transformer: prefix=`+dir+`:/remote Monitored: false; Archive: false Status: pending upload Discovery: -Num files: 4; Symlinks: 2; Hardlinks: 1; Size files: 0 B (and counting) -Uploaded: 0; Failed: 0; Missing: 0; Abnormal: 0 +Num files: 4; Symlinks: 2; Hardlinks: 1; Size (total/recently uploaded): 0 B (and counting) / 0 B +Uploaded: 0; Replaced: 0; Skipped: 0; Failed: 0; Missing: 0; Abnormal: 0 Directories: `+dir+toRemote) }) @@ -684,7 +690,7 @@ Directories: s.confirmOutput(t, []string{"status", "--user", "all"}, 0, `Global put queue status: 0 queued; 0 reserved to be worked on; 0 failed -Global put client status (/10): 0 creating collections; 0 currently uploading +Global put client status (/10): 0 iRODS connections; 0 creating collections; 0 currently uploading Name: setForRequesterPrinting Requester: `+currentUserName+` @@ -692,8 +698,8 @@ Transformer: `+transformer+` Monitored: false; Archive: false Status: complete Discovery: -Num files: 0; Symlinks: 0; Hardlinks: 0; Size files: 0 B -Uploaded: 0; Failed: 0; Missing: 0; Abnormal: 0 +Num files: 0; Symlinks: 0; Hardlinks: 0; Size (total/recently uploaded): 0 B / 0 B +Uploaded: 0; Replaced: 0; Skipped: 0; Failed: 0; Missing: 0; Abnormal: 0 Completed in: 0s Directories: `+local+" => "+remote) @@ -714,15 +720,15 @@ Directories: s.confirmOutput(t, []string{"status", "--name", "testAddFifo", "--details"}, 0, `Global put queue status: 0 queued; 0 reserved to be worked on; 0 failed -Global put client status (/10): 0 creating collections; 0 currently uploading +Global put client status (/10): 0 iRODS connections; 0 creating collections; 0 currently uploading Name: testAddFifo Transformer: prefix=`+dir+`:/remote Monitored: false; Archive: false Status: complete Discovery: -Num files: 1; Symlinks: 0; Hardlinks: 0; Size files: 0 B -Uploaded: 0; Failed: 0; Missing: 0; Abnormal: 1 +Num files: 1; Symlinks: 0; Hardlinks: 0; Size (total/recently uploaded): 0 B / 0 B +Uploaded: 0; Replaced: 0; Skipped: 0; Failed: 0; Missing: 0; Abnormal: 1 Completed in: 0s Example File: `+dir+`/fifo => /remote/fifo @@ -739,15 +745,15 @@ Path Status Size Attempts Date Error s.confirmOutput(t, []string{"status", "--name", "testAddFifoDir"}, 0, `Global put queue status: 0 queued; 0 reserved to be worked on; 0 failed -Global put client status (/10): 0 creating collections; 0 currently uploading +Global put client status (/10): 0 iRODS connections; 0 creating collections; 0 currently uploading Name: testAddFifoDir Transformer: prefix=`+dir+`:/remote Monitored: false; Archive: false Status: complete Discovery: -Num files: 0; Symlinks: 0; Hardlinks: 0; Size files: 0 B -Uploaded: 0; Failed: 0; Missing: 0; Abnormal: 0 +Num files: 0; Symlinks: 0; Hardlinks: 0; Size (total/recently uploaded): 0 B / 0 B +Uploaded: 0; Replaced: 0; Skipped: 0; Failed: 0; Missing: 0; Abnormal: 0 Completed in: 0s Directories: `+dir+toRemote) @@ -918,15 +924,15 @@ func TestBackup(t *testing.T) { bs.confirmOutput(t, []string{ "status", "-n", "testForBackup"}, 0, `Global put queue status: 0 queued; 0 reserved to be worked on; 0 failed -Global put client status (/10): 0 creating collections; 0 currently uploading +Global put client status (/10): 0 iRODS connections; 0 creating collections; 0 currently uploading Name: testForBackup Transformer: `+transformer+` Monitored: false; Archive: false Status: complete Discovery: -Num files: 0; Symlinks: 0; Hardlinks: 0; Size files: 0 B -Uploaded: 0; Failed: 0; Missing: 0; Abnormal: 0 +Num files: 0; Symlinks: 0; Hardlinks: 0; Size (total/recently uploaded): 0 B / 0 B +Uploaded: 0; Replaced: 0; Skipped: 0; Failed: 0; Missing: 0; Abnormal: 0 Completed in: 0s Directories: `+localDir+` => `+remoteDir) @@ -973,7 +979,69 @@ func TestPuts(t *testing.T) { path := t.TempDir() transformer := "prefix=" + path + ":" + remotePath - Convey("Putting a set with hardlinks uploads an empty file and special inode file", func() { + Convey("Repeatedly uploading files that are changed or not changes status details", func() { + file1 := filepath.Join(path, "file1") + file2 := filepath.Join(path, "file2") + file3 := filepath.Join(path, "file3") + + internal.CreateTestFile(t, file1, "some data1") + internal.CreateTestFile(t, file2, "some data2") + internal.CreateTestFile(t, file3, "some data3") + + setName := "changingFilesTest" + s.addSetForTesting(t, setName, transformer, path) + + statusCmd := []string{"status", "--name", setName} + + s.waitForStatus(setName, "\nStatus: uploading", 60*time.Second) + s.confirmOutputContains(t, statusCmd, 0, + `Global put queue status: 3 queued; 3 reserved to be worked on; 0 failed +Global put client status (/10): 6 iRODS connections`) + + s.waitForStatus(setName, "\nStatus: complete", 60*time.Second) + + s.confirmOutputContains(t, statusCmd, 0, + "Uploaded: 3; Replaced: 0; Skipped: 0; Failed: 0; Missing: 0; Abnormal: 0") + s.confirmOutputContains(t, statusCmd, 0, + "Num files: 3; Symlinks: 0; Hardlinks: 0; Size (total/recently uploaded): 30 B / 30 B") + + s.confirmOutputContains(t, statusCmd, 0, "") + + newName := setName + ".v2" + statusCmd[2] = newName + + s.addSetForTesting(t, newName, transformer, path) + s.waitForStatus(newName, "\nStatus: complete", 60*time.Second) + s.confirmOutputContains(t, statusCmd, 0, + "Uploaded: 0; Replaced: 0; Skipped: 3; Failed: 0; Missing: 0; Abnormal: 0") + s.confirmOutputContains(t, statusCmd, 0, + "Num files: 3; Symlinks: 0; Hardlinks: 0; Size (total/recently uploaded): 30 B / 0 B") + + newName = setName + ".v3" + statusCmd[2] = newName + + internal.CreateTestFile(t, file2, "some data2 updated") + + s.addSetForTesting(t, newName, transformer, path) + s.waitForStatus(newName, "\nStatus: complete", 60*time.Second) + s.confirmOutputContains(t, statusCmd, 0, + "Uploaded: 0; Replaced: 1; Skipped: 2; Failed: 0; Missing: 0; Abnormal: 0") + s.confirmOutputContains(t, statusCmd, 0, + "Num files: 3; Symlinks: 0; Hardlinks: 0; Size (total/recently uploaded): 38 B / 18 B") + + internal.CreateTestFile(t, file2, "less data") + exitCode, _ := s.runBinary(t, "retry", "--name", newName, "-a") + So(exitCode, ShouldEqual, 0) + + s.waitForStatus(newName, "\nStatus: complete", 60*time.Second) + s.confirmOutputContains(t, statusCmd, 0, + "Uploaded: 0; Replaced: 1; Skipped: 2; Failed: 0; Missing: 0; Abnormal: 0") + s.confirmOutputContains(t, statusCmd, 0, + "Num files: 3; Symlinks: 0; Hardlinks: 0; Size (total/recently uploaded): 29 B / 9 B") + }) + + // TODO: re-enable once hardlinks metamod bug fixed + SkipConvey("Putting a set with hardlinks uploads an empty file and special inode file", func() { file := filepath.Join(path, "file") link1 := filepath.Join(path, "hardlink1") link2 := filepath.Join(path, "hardlink2") @@ -992,6 +1060,7 @@ func TestPuts(t *testing.T) { s.addSetForTesting(t, "hardlinkTest", transformer, path) + s.waitForStatus("hardlinkTest", "\nStatus: uploading", 60*time.Second) s.waitForStatus("hardlinkTest", "\nStatus: complete", 60*time.Second) output := getRemoteMeta(remoteFile) @@ -1042,7 +1111,7 @@ func TestPuts(t *testing.T) { s.waitForStatus(setName, statusLine, 30*time.Second) expected := `Global put queue status: 1 queued; 0 reserved to be worked on; 1 failed -Global put client status (/10): 0 creating collections; 0 currently uploading +Global put client status (/10): 2 iRODS connections; 0 creating collections; 0 currently uploading no backup sets` s.confirmOutput(t, []string{"status", "-c"}, 0, expected) s.confirmOutput(t, []string{"status", "-q"}, 0, expected) @@ -1076,6 +1145,8 @@ func getRemoteMeta(path string) string { } func TestManualMode(t *testing.T) { + resetIRODS() + Convey("when using a manual put command, files are uploaded correctly", t, func() { remotePath := os.Getenv("IBACKUP_TEST_COLLECTION") if remotePath == "" { diff --git a/server/queue.go b/server/queue.go index 5e1555b..9103814 100644 --- a/server/queue.go +++ b/server/queue.go @@ -31,11 +31,13 @@ import ( "fmt" "net/http" "os" + "strconv" "github.com/VertebrateResequencing/wr/queue" "github.com/gin-gonic/gin" gas "github.com/wtsi-hgi/go-authserver" "github.com/wtsi-hgi/ibackup/put" + "github.com/wtsi-hgi/ibackup/slack" ) const ( @@ -44,8 +46,11 @@ const ( queueUploadingPath = "/uploading" queueAllPath = "/allrequests" queueCollCreationPath = "/collcreation" + iRODSConnectionPath = "/irodsconnection" paramHostPID = "hostpid" + ErrNoConnectionsNumber = gas.Error("nconnections must be provided") + // EndPointAuthQueueStatus is the endpoint for getting queue status. EndPointAuthQueueStatus = gas.EndPointAuth + queueStatusPath @@ -64,6 +69,10 @@ const ( // EndPointAuthQueueCollCreation is the endpoint for telling the server // when you start and stop creating collections. EndPointAuthQueueCollCreation = gas.EndPointAuth + queueCollCreationPath + + // EndPointAuthIRODSConnection is the endpoint for telling the server + // that a client created or closed its connections. + EndPointAuthIRODSConnection = gas.EndPointAuth + iRODSConnectionPath ) // MakeQueueEndPoints adds a number of endpoints to the REST API for working @@ -116,6 +125,9 @@ func (s *Server) MakeQueueEndPoints() error { authGroup.POST(queueCollCreationPath+hostPIDParam, s.clientStartedCreatingCollections) authGroup.DELETE(queueCollCreationPath+hostPIDParam, s.clientStoppedCreatingCollections) + authGroup.POST(iRODSConnectionPath+hostPIDParam, s.clientMadeIRODSConnections) + authGroup.DELETE(iRODSConnectionPath+hostPIDParam, s.clientClosedIRODSConnections) + return nil } @@ -131,6 +143,7 @@ func (s *Server) getQueueStatus(c *gin.Context) { type QStatus struct { Total int Reserved int + IRODSConnections int CreatingCollections int Uploading int Failed int @@ -156,16 +169,30 @@ func (s *Server) QueueStatus() *QStatus { stuck := s.uploadTracker.currentlyStuck() + iRODSConnectionsNumber := s.totalIRODSConnections() + return &QStatus{ Total: stats.Items, Reserved: stats.Running, CreatingCollections: len(s.creatingCollections), + IRODSConnections: iRODSConnectionsNumber, Uploading: s.uploadTracker.numUploading(), Failed: stats.Buried, Stuck: stuck, } } +// totalIRODSConnections requires you have the mapMu lock before calling. +func (s *Server) totalIRODSConnections() int { + var totalConnections int + + for _, v := range s.iRODSConnections { + totalConnections += v + } + + return totalConnections +} + // getBuried gets the server's BuriedRequests. // // MakeQueueEndPoints() must already have been called. This is called when there @@ -515,3 +542,87 @@ func (c *Client) FinishedCreatingCollections() error { return responseToErr(resp) } + +// MakingIRODSConnections tells the server you've started to create iRODS +// connections. Be sure to defer ClosedIRODSConnections(). +func (c *Client) MakingIRODSConnections(number int) error { + hostPID, err := hostPID() + if err != nil { + return err + } + + resp, err := c.request().Post(EndPointAuthIRODSConnection + "/" + hostPID + "?nconnections=" + strconv.Itoa(number)) + if err != nil { + return err + } + + return responseToErr(resp) +} + +// ClosedIRODSConnections tells the server you've closed some iRODS connections +// following a MakingIRODSConnections call. +func (c *Client) ClosedIRODSConnections() error { + hostPID, err := hostPID() + if err != nil { + return err + } + + resp, err := c.request().Delete(EndPointAuthIRODSConnection + "/" + hostPID) + if err != nil { + return err + } + + return responseToErr(resp) +} + +// clientMadeIRODSConnections notes that a client created some iRODS connections. +// +// MakeQueueEndPoints() must already have been called. This is called when there +// is a POST on /rest/v1/auth/irodsconnection. +func (s *Server) clientMadeIRODSConnections(c *gin.Context) { + hostPID, status, err := s.extractHostPIDForCreatingCollectionsEndpoints(c) + if err != nil { + c.AbortWithError(status, err) //nolint:errcheck + + return + } + + numberOfConnections := c.Query("nconnections") + if numberOfConnections == "" { + c.AbortWithError(http.StatusBadRequest, ErrNoConnectionsNumber) //nolint:errcheck + + return + } + + n, err := strconv.Atoi(numberOfConnections) + if err != nil { + c.AbortWithError(http.StatusBadRequest, err) //nolint:errcheck + + return + } + + s.mapMu.Lock() + defer s.mapMu.Unlock() + + s.iRODSConnections[hostPID] += n + s.slacker.SendMessage(slack.Info, strconv.Itoa(s.totalIRODSConnections())+" iRODS connections open") + + c.Status(http.StatusOK) +} + +func (s *Server) clientClosedIRODSConnections(c *gin.Context) { + hostPID, status, err := s.extractHostPIDForCreatingCollectionsEndpoints(c) + if err != nil { + c.AbortWithError(status, err) //nolint:errcheck + + return + } + + s.mapMu.Lock() + defer s.mapMu.Unlock() + + delete(s.iRODSConnections, hostPID) + s.slacker.SendMessage(slack.Info, strconv.Itoa(s.totalIRODSConnections())+" iRODS connections open") + + c.Status(http.StatusOK) +} diff --git a/server/server.go b/server/server.go index 658a567..d896e94 100644 --- a/server/server.go +++ b/server/server.go @@ -110,6 +110,7 @@ type Server struct { mapMu sync.RWMutex creatingCollections map[string]bool + iRODSConnections map[string]int } // New creates a Server which can serve a REST API and website. @@ -127,6 +128,7 @@ func New(conf Config) (*Server, error) { dirPool: workerpool.New(workerPoolSizeDir), queue: queue.New(context.Background(), "put"), creatingCollections: make(map[string]bool), + iRODSConnections: make(map[string]int), slacker: conf.Slacker, stillRunningMsgFreq: conf.StillRunningMsgFreq, uploadTracker: newUploadTracker(conf.Slacker, conf.SlackMessageDebounce), diff --git a/server/server_test.go b/server/server_test.go index 0651940..85e8f44 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1374,8 +1374,10 @@ func TestServer(t *testing.T) { So(err, ShouldBeNil) So(gotSet.Status, ShouldEqual, set.Complete) So(gotSet.NumFiles, ShouldEqual, 6) - So(gotSet.SizeFiles, ShouldEqual, 15) - So(gotSet.Uploaded, ShouldEqual, 5) + So(gotSet.SizeTotal, ShouldEqual, 15) + So(gotSet.Uploaded, ShouldEqual, 3) + So(gotSet.Replaced, ShouldEqual, 1) + So(gotSet.Skipped, ShouldEqual, 1) So(gotSet.Failed, ShouldEqual, 1) So(gotSet.Symlinks, ShouldEqual, 1) So(gotSet.Missing, ShouldEqual, 0) @@ -1612,9 +1614,24 @@ func TestServer(t *testing.T) { So(gotSet.Uploaded, ShouldEqual, 0) So(gotSet.Error, ShouldBeBlank) + qs, errg := client.GetQueueStatus() + So(errg, ShouldBeNil) + So(qs.IRODSConnections, ShouldEqual, 0) + + slackWriter.Reset() + + err = client.MakingIRODSConnections(2) + So(err, ShouldBeNil) + + So(slackWriter.String(), ShouldEqual, slack.BoxPrefixInfo+"2 iRODS connections open") + p, d := makePutter(t, handler, requests, client) defer d() + qs, err = client.GetQueueStatus() + So(err, ShouldBeNil) + So(qs.IRODSConnections, ShouldEqual, 2) + uploadStarts, uploadResults, skippedResults := p.Put() err = client.SendPutResultsToServer(uploadStarts, uploadResults, skippedResults, @@ -1628,6 +1645,24 @@ func TestServer(t *testing.T) { So(gotSet.Uploaded, ShouldEqual, len(discovers)) So(gotSet.Symlinks, ShouldEqual, 1) + slackWriter.Reset() + + err = client.MakingIRODSConnections(2) + So(err, ShouldBeNil) + + So(slackWriter.String(), ShouldEqual, slack.BoxPrefixInfo+"4 iRODS connections open") + + slackWriter.Reset() + + err = client.ClosedIRODSConnections() + So(err, ShouldBeNil) + + qs, err = client.GetQueueStatus() + So(err, ShouldBeNil) + So(qs.IRODSConnections, ShouldEqual, 0) + + So(slackWriter.String(), ShouldEqual, slack.BoxPrefixInfo+"0 iRODS connections open") + Convey("After completion, re-discovery can find new files and we can re-complete", func() { newFile := filepath.Join(dirs[0], "new") internal.CreateTestFileOfLength(t, newFile, 2) @@ -1668,7 +1703,9 @@ func TestServer(t *testing.T) { So(err, ShouldBeNil) So(gotSet.Status, ShouldEqual, set.Complete) So(gotSet.NumFiles, ShouldEqual, expectedNumFiles) - So(gotSet.Uploaded, ShouldEqual, expectedNumFiles) + So(gotSet.Uploaded, ShouldEqual, 1) + So(gotSet.Replaced, ShouldEqual, 0) + So(gotSet.Skipped, ShouldEqual, expectedNumFiles-1) So(gotSet.Symlinks, ShouldEqual, 1) entries, errg := client.GetFiles(exampleSet.ID()) @@ -1676,7 +1713,7 @@ func TestServer(t *testing.T) { So(len(entries), ShouldEqual, expectedNumFiles) for n, entry := range entries { - So(entry.Status, ShouldEqual, set.Uploaded) + So(entry.Status == set.Uploaded || entry.Status == set.Skipped, ShouldBeTrue) if n == 3 { So(entry.Type, ShouldEqual, set.Symlink) @@ -1705,7 +1742,7 @@ func TestServer(t *testing.T) { } } - So(gotSet.SizeFiles, ShouldEqual, expectedSetSize) + So(gotSet.SizeTotal, ShouldEqual, expectedSetSize) }) }) @@ -1775,7 +1812,9 @@ func TestServer(t *testing.T) { So(err, ShouldBeNil) So(gotSet.Status, ShouldEqual, set.Complete) So(gotSet.NumFiles, ShouldEqual, len(discovers)) - So(gotSet.Uploaded, ShouldEqual, len(discovers)) + So(gotSet.Uploaded, ShouldEqual, 1) + So(gotSet.Replaced, ShouldEqual, 0) + So(gotSet.Skipped, ShouldEqual, len(discovers)-1) So(gotSet.Symlinks, ShouldEqual, 1) entries, errg := client.GetFiles(exampleSet.ID()) @@ -1783,7 +1822,7 @@ func TestServer(t *testing.T) { So(len(entries), ShouldEqual, len(discovers)) for n, entry := range entries { - So(entry.Status, ShouldEqual, set.Uploaded) + So(entry.Status == set.Uploaded || entry.Status == set.Skipped, ShouldBeTrue) if n == 2 { So(entry.Type, ShouldEqual, set.Symlink) @@ -2730,7 +2769,7 @@ func TestServer(t *testing.T) { So(gotSet.NumFiles, ShouldEqual, 3) So(gotSet.Uploaded, ShouldEqual, 3) So(gotSet.Hardlinks, ShouldEqual, 2) - So(gotSet.SizeFiles, ShouldEqual, 1) + So(gotSet.SizeTotal, ShouldEqual, 1) Convey("moving all files of an inode uploads hardlinks to new location", func() { path4 := filepath.Join(localDir, "file2.link1") diff --git a/server/setdb.go b/server/setdb.go index 3046233..bbc122c 100644 --- a/server/setdb.go +++ b/server/setdb.go @@ -185,20 +185,14 @@ func (s *Server) LoadSetDB(path, backupPath string) error { } func (s *Server) setupDB(path, backupPath string, authGroup *gin.RouterGroup) error { - err := s.sendSlackMessage(slack.Info, "server starting, loading database") - if err != nil { - return err - } + s.sendSlackMessage(slack.Info, "server starting, loading database") db, err := set.New(path, backupPath) if err != nil { return err } - err = s.sendSlackMessage(slack.Success, "server loaded database") - if err != nil { - return err - } + s.sendSlackMessage(slack.Success, "server loaded database") go s.tellSlackStillRunning() @@ -211,12 +205,12 @@ func (s *Server) setupDB(path, backupPath string, authGroup *gin.RouterGroup) er return nil } -func (s *Server) sendSlackMessage(level slack.Level, msg string) error { +func (s *Server) sendSlackMessage(level slack.Level, msg string) { if s.slacker == nil { - return nil + return } - return s.slacker.SendMessage(level, msg) + s.slacker.SendMessage(level, msg) } func (s *Server) tellSlackStillRunning() { @@ -233,18 +227,15 @@ func (s *Server) tellSlackStillRunning() { for { select { case <-ticker.C: - err := s.serverStillRunning() - if err != nil { - return - } + s.serverStillRunning() case <-s.serverAliveCh: return } } } -func (s *Server) serverStillRunning() error { - return s.slacker.SendMessage(slack.Info, "server is still running") +func (s *Server) serverStillRunning() { + s.slacker.SendMessage(slack.Info, "server is still running") } // EnableRemoteDBBackups causes the database backup file to also be backed up to diff --git a/server/uploading.go b/server/uploading.go index 801e8bc..6762a99 100644 --- a/server/uploading.go +++ b/server/uploading.go @@ -84,11 +84,11 @@ func (ut *uploadTracker) createAndSendSlackMsg() { msg := fmt.Sprintf("%d client%s uploading", len(ut.uploading), suffix) - if ut.bouncing || msg == ut.lastMsg { + if ut.slacker == nil || ut.bouncing || msg == ut.lastMsg { return } - ut.slacker.SendMessage(slack.Info, msg) //nolint:errcheck + ut.slacker.SendMessage(slack.Info, msg) ut.lastMsg = msg ut.bouncing = true debounce := ut.debounce diff --git a/set/db.go b/set/db.go index bfc57d1..7d12e9e 100644 --- a/set/db.go +++ b/set/db.go @@ -228,7 +228,7 @@ func (d *DB) AddOrUpdate(set *Set) error { }) if err == nil { - err = set.SuccessfullyStoredInDB() + set.SuccessfullyStoredInDB() } return err @@ -557,10 +557,7 @@ func (d *DB) updateSetAfterDiscovery(setID string) (*Set, error) { return err } - err = set.DiscoveryCompleted(d.countAllFilesInSet(tx, setID)) - if err != nil { - return err - } + set.DiscoveryCompleted(d.countAllFilesInSet(tx, setID)) updatedSet = set @@ -740,10 +737,17 @@ func requestStatusToEntryStatus(r *put.Request, entry *Entry) { if r.Stuck != nil { entry.LastError = r.Stuck.String() } - case put.RequestStatusUploaded, put.RequestStatusUnmodified, put.RequestStatusReplaced: + case put.RequestStatusUploaded: entry.Status = Uploaded entry.unFailed = entry.Attempts > 1 entry.LastError = "" + case put.RequestStatusReplaced: + entry.Status = Replaced + entry.unFailed = entry.Attempts > 1 + entry.LastError = "" + case put.RequestStatusUnmodified: + entry.Status = Skipped + entry.LastError = "" case put.RequestStatusFailed: entry.Status = Failed @@ -1058,25 +1062,22 @@ func (d *DBRO) GetDirEntries(setID string) ([]*Entry, error) { // SetError updates a set with the given error message. Returns an error if the // setID isn't in the database. func (d *DB) SetError(setID, errMsg string) error { - return d.updateSetProperties(setID, func(got *Set) error { - return got.SetError(errMsg) + return d.updateSetProperties(setID, func(got *Set) { + got.SetError(errMsg) }) } // updateSetProperties retrives a set from the database and gives it to your // callback, allowing you to change properties on it. The altered set will then // be stored back in the database. -func (d *DB) updateSetProperties(setID string, cb func(*Set) error) error { +func (d *DB) updateSetProperties(setID string, cb func(*Set)) error { return d.db.Update(func(tx *bolt.Tx) error { set, bid, b, err := d.getSetByID(tx, setID) if err != nil { return err } - err = cb(set) - if err != nil { - return err - } + cb(set) return b.Put(bid, d.encodeToBytes(set)) }) @@ -1085,8 +1086,8 @@ func (d *DB) updateSetProperties(setID string, cb func(*Set) error) error { // SetWarning updates a set with the given warning message. Returns an error if // the setID isn't in the database. func (d *DB) SetWarning(setID, warnMsg string) error { - return d.updateSetProperties(setID, func(got *Set) error { - return got.SetWarning(warnMsg) + return d.updateSetProperties(setID, func(got *Set) { + got.SetWarning(warnMsg) }) } diff --git a/set/entries.go b/set/entries.go index dbc12b4..25c9e3f 100644 --- a/set/entries.go +++ b/set/entries.go @@ -45,8 +45,8 @@ const ( // UploadingEntry is an Entry status meaning the file has started to upload. UploadingEntry - // Uploaded is an Entry status meaning the file has been uploaded - // successfully. + // Uploaded is an Entry status meaning the file has been uploaded for the + // first time successfully. Uploaded // Failed is an Entry status meaning the file failed to upload due to some @@ -61,6 +61,14 @@ const ( // regular nor a symlink (ie. it's a fifo or socket etc.), so shouldn't be // uploaded. AbnormalEntry + + // Replaced is an Entry status meaning the file has been uploaded previously + // but was uploaded again because the local file was changed. + Replaced + + // Skipped is an Entry status meaning the file was not uploaded because it + // was uploaded previously and hasn't changed since. + Skipped ) type EntryType int @@ -86,6 +94,8 @@ func (e EntryStatus) String() string { "failed", "missing", "abnormal", + "replaced", + "skipped", }[e] } diff --git a/set/set.go b/set/set.go index 33724d4..8bb862f 100644 --- a/set/set.go +++ b/set/set.go @@ -39,7 +39,7 @@ import ( type Status int type Slacker interface { - SendMessage(level slack.Level, msg string) error + SendMessage(level slack.Level, msg string) } const ( @@ -129,15 +129,25 @@ type Set struct { // set, as of the last discovery. This is a read-only value. NumFiles uint64 - // SizeFiles provides the total size (bytes) of set and discovered files in + // SizeTotal provides the total size (bytes) of set and discovered files in // this set, as of the last discovery. This is a read-only value. - SizeFiles uint64 + SizeTotal uint64 // Uploaded provides the total number of set and discovered files in this - // set that have been uploaded or confirmed uploaded since the last - // discovery. This is a read-only value. + // set that have, for the first time, been uploaded or confirmed uploaded + // since the last discovery. This is a read-only value. Uploaded uint64 + // Replaced is like Uploaded, but for files that had previously been + // uploaded to iRODS, and now uploaded again because the file on local disk + // was newer. + Replaced uint64 + + // Skipped is like Uploaded, but for files that had previously been + // uploaded to iRODS, and were not uploaded again because the file on local + // disk was the same age. + Skipped uint64 + // Failed provides the total number of set and discovered files in this set // that have failed their upload since the last discovery. This is a // read-only value. @@ -178,6 +188,10 @@ type Set struct { // LastCompletedCount. This is a read-only value. LastCompletedSize uint64 + // SizeUploaded provides the size of files (bytes) actually uploaded (not + // skipped) since the last discovery. This is a read-only value. + SizeUploaded uint64 + // Error holds any error that applies to the whole set, such as an issue // with the Transformer. This is a read-only value. Error string @@ -244,7 +258,7 @@ func (s *Set) Size() string { return fmt.Sprintf("%s (as of last completion)", humanize.IBytes(s.LastCompletedSize)) } - sfiles := humanize.IBytes(s.SizeFiles) + sfiles := humanize.IBytes(s.SizeTotal) //nolint:misspell if s.Status != Complete { sfiles += " (and counting)" @@ -253,6 +267,13 @@ func (s *Set) Size() string { return sfiles } +// UploadedSize provides a string representation of SizeUploaded in a human +// readable format. This is the size of actual uploads (excluding skipped, +// unlike Size()) since the last discovery. +func (s *Set) UploadedSize() string { + return humanize.IBytes(s.SizeUploaded) //nolint:misspell +} + func (s *Set) TransformPath(path string) (string, error) { transformer, err := s.MakeTransformer() if err != nil { @@ -317,8 +338,8 @@ func (s *Set) Queued() bool { return s.Status == PendingDiscovery || s.Status == PendingUpload } -// countsValid tells you if our Uploaded, Failed and Missing counts are valid -// (0..NumFiles). +// countsValid tells you if our Uploaded, Replaced, Skipped, Failed and Missing +// counts are valid (0..NumFiles). func (s *Set) countsValid() bool { // we can't just do the final summed test, because if the numbers are close // to max uint64 value from a wrapping bug, they'll wrap back around and @@ -327,6 +348,14 @@ func (s *Set) countsValid() bool { return false } + if s.Replaced > s.NumFiles { + return false + } + + if s.Skipped > s.NumFiles { + return false + } + if s.Failed > s.NumFiles { return false } @@ -347,10 +376,10 @@ func (s *Set) countsValid() bool { return false } - return s.Uploaded+s.Failed+s.Missing+s.Abnormal <= s.NumFiles + return s.Uploaded+s.Replaced+s.Skipped+s.Failed+s.Missing+s.Abnormal <= s.NumFiles } -func (s *Set) adjustBasedOnEntry(entry *Entry) error { +func (s *Set) adjustBasedOnEntry(entry *Entry) { if entry.Type == Symlink { s.Symlinks-- } else if entry.Type == Hardlink { @@ -358,7 +387,12 @@ func (s *Set) adjustBasedOnEntry(entry *Entry) error { } if entry.newSize { - s.SizeFiles += entry.Size + s.SizeTotal += entry.Size + s.SizeUploaded += entry.Size + } + + if entry.Status == Skipped { + s.SizeUploaded -= entry.Size } if entry.unFailed { @@ -369,26 +403,24 @@ func (s *Set) adjustBasedOnEntry(entry *Entry) error { } } - return s.entryToSetCounts(entry) + s.entryToSetCounts(entry) } // entryToSetCounts increases set Uploaded, Failed or Missing based on // set.Status. -func (s *Set) entryToSetCounts(entry *Entry) error { - err := s.entryStatusToSetCounts(entry) - if err != nil { - return err - } - +func (s *Set) entryToSetCounts(entry *Entry) { + s.entryStatusToSetCounts(entry) s.entryTypeToSetCounts(entry) - - return nil } -func (s *Set) entryStatusToSetCounts(entry *Entry) error { +func (s *Set) entryStatusToSetCounts(entry *Entry) { //nolint:gocyclo switch entry.Status { //nolint:exhaustive case Uploaded: s.Uploaded++ + case Replaced: + s.Replaced++ + case Skipped: + s.Skipped++ case Failed: if entry.newFail { s.Failed++ @@ -396,24 +428,21 @@ func (s *Set) entryStatusToSetCounts(entry *Entry) error { if entry.Attempts >= AttemptsToBeConsideredFailing { s.Status = Failing - - return s.sendSlackMessage(slack.Error, "has failed uploads") + s.sendSlackMessage(slack.Error, "has failed uploads") } case Missing: s.Missing++ case AbnormalEntry: s.Abnormal++ } - - return nil } -func (s *Set) sendSlackMessage(level slack.Level, msg string) error { +func (s *Set) sendSlackMessage(level slack.Level, msg string) { if s.slacker == nil { - return nil + return } - return s.slacker.SendMessage(level, s.createSlackMessage(msg)) + s.slacker.SendMessage(level, s.createSlackMessage(msg)) } func (s *Set) createSlackMessage(msg string) string { @@ -437,13 +466,13 @@ func (s *Set) LogChangesToSlack(slacker Slacker) { // SuccessfullyStoredInDB should be called when you successfully store the set // in DB. -func (s *Set) SuccessfullyStoredInDB() error { - return s.sendSlackMessage(slack.Info, "stored in db") +func (s *Set) SuccessfullyStoredInDB() { + s.sendSlackMessage(slack.Info, "stored in db") } // DiscoveryCompleted should be called when you complete discovering a set. Pass // in the number of files you discovered. -func (s *Set) DiscoveryCompleted(numFiles uint64) error { +func (s *Set) DiscoveryCompleted(numFiles uint64) { s.LastDiscovery = time.Now() s.NumFiles = numFiles @@ -451,59 +480,57 @@ func (s *Set) DiscoveryCompleted(numFiles uint64) error { s.Status = Complete s.LastCompleted = time.Now() - return s.sendSlackMessage(slack.Warn, "completed discovery and backup due to no files") + s.sendSlackMessage(slack.Warn, "completed discovery and backup due to no files") + + return } s.Status = PendingUpload - return s.sendSlackMessage(slack.Info, fmt.Sprintf("completed discovery: %d files", numFiles)) + s.sendSlackMessage(slack.Info, fmt.Sprintf("completed discovery: %d files", numFiles)) } // UpdateBasedOnEntry updates set status values based on an updated Entry // from updateFileEntry(), assuming that request is for one of set's file // entries. func (s *Set) UpdateBasedOnEntry(entry *Entry, getFileEntries func(string) ([]*Entry, error)) error { - err := s.checkIfUploading() - if err != nil { - return err - } + s.checkIfUploading() - err = s.adjustBasedOnEntry(entry) - if err != nil { - return err - } + s.adjustBasedOnEntry(entry) - err = s.fixCounts(entry, getFileEntries) + err := s.fixCounts(entry, getFileEntries) if err != nil { return err } - return s.checkIfComplete() + s.checkIfComplete() + + return nil } -func (s *Set) checkIfUploading() error { +func (s *Set) checkIfUploading() { if !(s.Status == PendingDiscovery || s.Status == PendingUpload) { - return nil + return } s.Status = Uploading - return s.sendSlackMessage(slack.Info, "started uploading files") + s.sendSlackMessage(slack.Info, "started uploading files") } -func (s *Set) checkIfComplete() error { - if !(s.Uploaded+s.Failed+s.Missing+s.Abnormal == s.NumFiles) { - return nil +func (s *Set) checkIfComplete() { + if !(s.Uploaded+s.Replaced+s.Skipped+s.Failed+s.Missing+s.Abnormal == s.NumFiles) { + return } s.Status = Complete s.LastCompleted = time.Now() - s.LastCompletedCount = s.Uploaded + s.Failed - s.LastCompletedSize = s.SizeFiles + s.LastCompletedCount = s.Uploaded + s.Replaced + s.Skipped + s.Failed + s.LastCompletedSize = s.SizeTotal - return s.sendSlackMessage(slack.Success, fmt.Sprintf("completed backup "+ - "(%d uploaded; %d failed; %d missing; %d abnormal; %s of data)", - s.Uploaded, s.Failed, s.Missing, s.Abnormal, s.Size())) + s.sendSlackMessage(slack.Success, fmt.Sprintf("completed backup "+ + "(%d newly uploaded; %d replaced; %d skipped; %d failed; %d missing; %d abnormal; %s data uploaded)", + s.Uploaded, s.Replaced, s.Skipped, s.Failed, s.Missing, s.Abnormal, s.UploadedSize())) } // fixCounts resets the set counts to 0 and goes through all the entries for @@ -520,19 +547,23 @@ func (s *Set) fixCounts(entry *Entry, getFileEntries func(string) ([]*Entry, err } s.Uploaded = 0 + s.Replaced = 0 + s.Skipped = 0 s.Failed = 0 s.Missing = 0 s.Abnormal = 0 s.Symlinks = 0 s.Hardlinks = 0 - return s.updateAllCounts(entries, entry) + s.updateAllCounts(entries, entry) + + return nil } // updateAllCounts should be called after setting all counts to 0 (because they // had become invalid), and then recalculates the counts. Also marks the given // entry as newFail if any entry in entries is Failed. -func (s *Set) updateAllCounts(entries []*Entry, entry *Entry) error { +func (s *Set) updateAllCounts(entries []*Entry, entry *Entry) { for _, e := range entries { if e.Path == entry.Path { e = entry @@ -542,32 +573,27 @@ func (s *Set) updateAllCounts(entries []*Entry, entry *Entry) error { e.newFail = true } - err := s.entryToSetCounts(e) - if err != nil { - return err - } + s.entryToSetCounts(e) } - - return nil } // SetError records the given error against the set, indicating it wont work. -func (s *Set) SetError(errMsg string) error { +func (s *Set) SetError(errMsg string) { s.Error = errMsg - return s.sendSlackMessage(slack.Error, "is invalid: "+errMsg) + s.sendSlackMessage(slack.Error, "is invalid: "+errMsg) } // SetWarning records the given warning against the set, indicating it has an // issue. -func (s *Set) SetWarning(warnMsg string) error { +func (s *Set) SetWarning(warnMsg string) { s.Warning = warnMsg - return s.sendSlackMessage(slack.Warn, "has an issue: "+warnMsg) + s.sendSlackMessage(slack.Warn, "has an issue: "+warnMsg) } -func (s *Set) RecoveryError(err error) error { - return s.sendSlackMessage(slack.Error, "could not be recovered: "+err.Error()) +func (s *Set) RecoveryError(err error) { + s.sendSlackMessage(slack.Error, "could not be recovered: "+err.Error()) } // copyUserProperties copies data from one set into another. @@ -584,8 +610,11 @@ func (s *Set) copyUserProperties(copySet *Set) { func (s *Set) reset() { s.StartedDiscovery = time.Now() s.NumFiles = 0 - s.SizeFiles = 0 + s.SizeTotal = 0 + s.SizeUploaded = 0 s.Uploaded = 0 + s.Replaced = 0 + s.Skipped = 0 s.Failed = 0 s.Missing = 0 s.Abnormal = 0 diff --git a/set/set_test.go b/set/set_test.go index 66ee1f9..09431c5 100644 --- a/set/set_test.go +++ b/set/set_test.go @@ -158,7 +158,7 @@ func TestSet(t *testing.T) { So(s.Count(), ShouldEqual, "3") So(s.Size(), ShouldEqual, "0 B (and counting)") - s.SizeFiles = 30 + s.SizeTotal = 30 So(s.Size(), ShouldEqual, "30 B (and counting)") s.Status = Complete @@ -168,7 +168,7 @@ func TestSet(t *testing.T) { s.LastCompletedCount = 3 s.LastCompletedSize = 30 s.NumFiles = 0 - s.SizeFiles = 0 + s.SizeTotal = 0 s.Status = PendingDiscovery So(s.Count(), ShouldEqual, "3 (as of last completion)") @@ -227,16 +227,16 @@ func TestSet(t *testing.T) { month := 730 * time.Hour sets := []*Set{ {Name: "setA", Requester: "userA", LastCompleted: nov23, - SizeFiles: 10, NumFiles: 1}, + SizeTotal: 10, NumFiles: 1}, {Name: "setB", Requester: "userA", LastCompleted: nov23.Add(month), - SizeFiles: 20, NumFiles: 2}, + SizeTotal: 20, NumFiles: 2}, {Name: "setC", Requester: "userB", LastCompleted: nov23, - SizeFiles: 40, NumFiles: 3}, + SizeTotal: 40, NumFiles: 3}, {Name: "setD", Requester: "userC", LastCompleted: nov23.Add(-1 * month), - SizeFiles: 1, NumFiles: 4}, + SizeTotal: 1, NumFiles: 4}, {Name: "setE", Requester: "userD", LastCompleted: nov23.Add(-1 * month), - SizeFiles: 1, NumFiles: 5}, - {Name: "setF", Requester: "userE", SizeFiles: 0, NumFiles: 1}, + SizeTotal: 1, NumFiles: 5}, + {Name: "setF", Requester: "userE", SizeTotal: 0, NumFiles: 1}, } usage := UsageSummary(sets) @@ -289,7 +289,7 @@ func TestSet(t *testing.T) { sets = []*Set{ {Name: "setA", Requester: "userA", - SizeFiles: 10 * uint64(bytesInTiB), NumFiles: 1}, + SizeTotal: 10 * uint64(bytesInTiB), NumFiles: 1}, } usage = UsageSummary(sets) @@ -523,7 +523,7 @@ func TestSetDB(t *testing.T) { So(sets[0].StartedDiscovery.IsZero(), ShouldBeFalse) So(sets[0].LastDiscovery.IsZero(), ShouldBeFalse) So(sets[0].NumFiles, ShouldEqual, 5) - So(sets[0].SizeFiles, ShouldEqual, 0) + So(sets[0].SizeTotal, ShouldEqual, 0) setsAll, errg := db.GetAll() So(errg, ShouldBeNil) @@ -554,7 +554,7 @@ func TestSetDB(t *testing.T) { So(sets[0].Status, ShouldEqual, Uploading) So(sets[0].NumFiles, ShouldEqual, 5) - So(sets[0].SizeFiles, ShouldEqual, 3) + So(sets[0].SizeTotal, ShouldEqual, 3) So(sets[0].Uploaded, ShouldEqual, 0) So(sets[0].LastCompletedSize, ShouldEqual, 0) @@ -588,7 +588,7 @@ func TestSetDB(t *testing.T) { So(sets[0].Status, ShouldEqual, Uploading) So(sets[0].NumFiles, ShouldEqual, 5) - So(sets[0].SizeFiles, ShouldEqual, 3) + So(sets[0].SizeTotal, ShouldEqual, 3) So(sets[0].Uploaded, ShouldEqual, 1) So(sets[0].Failed, ShouldEqual, 0) So(sets[0].LastCompletedSize, ShouldEqual, 0) @@ -616,14 +616,16 @@ func TestSetDB(t *testing.T) { So(err, ShouldBeNil) So(sets[0].Status, ShouldEqual, Uploading) - So(sets[0].SizeFiles, ShouldEqual, 5) - So(sets[0].Uploaded, ShouldEqual, 2) + So(sets[0].SizeTotal, ShouldEqual, 5) + So(sets[0].Uploaded, ShouldEqual, 1) + So(sets[0].Replaced, ShouldEqual, 0) + So(sets[0].Skipped, ShouldEqual, 1) fEntries, err = db.GetFileEntries(sets[0].ID()) So(err, ShouldBeNil) So(len(fEntries), ShouldEqual, 5) So(fEntries[1].Size, ShouldEqual, 2) - So(fEntries[1].Status, ShouldEqual, Uploaded) + So(fEntries[1].Status, ShouldEqual, Skipped) So(fEntries[1].LastAttempt.IsZero(), ShouldBeFalse) r = &put.Request{ @@ -646,15 +648,17 @@ func TestSetDB(t *testing.T) { So(err, ShouldBeNil) So(sets[0].NumFiles, ShouldEqual, 5) - So(sets[0].Uploaded, ShouldEqual, 3) + So(sets[0].Uploaded, ShouldEqual, 1) + So(sets[0].Replaced, ShouldEqual, 1) + So(sets[0].Skipped, ShouldEqual, 1) So(sets[0].Status, ShouldEqual, Uploading) - So(sets[0].SizeFiles, ShouldEqual, 9) + So(sets[0].SizeTotal, ShouldEqual, 9) fEntries, err = db.GetFileEntries(sets[0].ID()) So(err, ShouldBeNil) So(len(fEntries), ShouldEqual, 5) So(fEntries[2].Size, ShouldEqual, 4) - So(fEntries[2].Status, ShouldEqual, Uploaded) + So(fEntries[2].Status, ShouldEqual, Replaced) So(fEntries[2].LastAttempt.IsZero(), ShouldBeFalse) r = &put.Request{ @@ -684,8 +688,10 @@ func TestSetDB(t *testing.T) { So(err, ShouldBeNil) So(sets[0].Status, ShouldEqual, Uploading) - So(sets[0].SizeFiles, ShouldEqual, 15) - So(sets[0].Uploaded, ShouldEqual, 3) + So(sets[0].SizeTotal, ShouldEqual, 15) + So(sets[0].Uploaded, ShouldEqual, 1) + So(sets[0].Replaced, ShouldEqual, 1) + So(sets[0].Skipped, ShouldEqual, 1) So(sets[0].Failed, ShouldEqual, 1) fEntries, err = db.GetFileEntries(sets[0].ID()) @@ -714,8 +720,10 @@ func TestSetDB(t *testing.T) { So(err, ShouldBeNil) So(sets[0].Status, ShouldEqual, Uploading) - So(sets[0].SizeFiles, ShouldEqual, 15) - So(sets[0].Uploaded, ShouldEqual, 3) + So(sets[0].SizeTotal, ShouldEqual, 15) + So(sets[0].Uploaded, ShouldEqual, 1) + So(sets[0].Replaced, ShouldEqual, 1) + So(sets[0].Skipped, ShouldEqual, 1) So(sets[0].Failed, ShouldEqual, 1) fEntries, err = db.GetFileEntries(sets[0].ID()) @@ -739,8 +747,10 @@ func TestSetDB(t *testing.T) { So(err, ShouldBeNil) So(sets[0].Status, ShouldEqual, Failing) - So(sets[0].SizeFiles, ShouldEqual, 15) - So(sets[0].Uploaded, ShouldEqual, 3) + So(sets[0].SizeTotal, ShouldEqual, 15) + So(sets[0].Uploaded, ShouldEqual, 1) + So(sets[0].Replaced, ShouldEqual, 1) + So(sets[0].Skipped, ShouldEqual, 1) So(sets[0].Failed, ShouldEqual, 1) So(slackWriter.String(), ShouldEqual, slack.BoxPrefixError+"`jim.set1` has failed uploads") slackWriter.Reset() @@ -770,19 +780,22 @@ func TestSetDB(t *testing.T) { So(err, ShouldBeNil) So(sets[0].Status, ShouldEqual, Complete) - So(sets[0].SizeFiles, ShouldEqual, 15) - So(sets[0].Uploaded, ShouldEqual, 3) + So(sets[0].SizeTotal, ShouldEqual, 15) + So(sets[0].Uploaded, ShouldEqual, 1) + So(sets[0].Replaced, ShouldEqual, 1) + So(sets[0].Skipped, ShouldEqual, 1) So(sets[0].Failed, ShouldEqual, 1) So(sets[0].Missing, ShouldEqual, 1) So(slackWriter.String(), ShouldEqual, fmt.Sprintf("%s`jim.set1` completed backup "+ - "(%d uploaded; %d failed; %d missing; %d abnormal; %s of data)", - slack.BoxPrefixSuccess, sets[0].Uploaded, sets[0].Failed, - sets[0].Missing, sets[0].Abnormal, sets[0].Size())) + "(%d newly uploaded; %d replaced; %d skipped; %d failed; %d missing; %d abnormal; %s data uploaded)", + slack.BoxPrefixSuccess, sets[0].Uploaded, sets[0].Replaced, sets[0].Skipped, sets[0].Failed, + sets[0].Missing, sets[0].Abnormal, sets[0].UploadedSize())) lastCompleted := sets[0].LastCompleted So(lastCompleted.IsZero(), ShouldBeFalse) So(sets[0].LastCompletedSize, ShouldEqual, 15) So(sets[0].LastCompletedCount, ShouldEqual, 4) + So(sets[0].SizeUploaded, ShouldEqual, 13) fEntries, err = db.GetFileEntries(sets[0].ID()) So(err, ShouldBeNil) @@ -811,8 +824,10 @@ func TestSetDB(t *testing.T) { So(err, ShouldBeNil) So(sets[0].Status, ShouldEqual, Complete) - So(sets[0].SizeFiles, ShouldEqual, 15) - So(sets[0].Uploaded, ShouldEqual, 4) + So(sets[0].SizeTotal, ShouldEqual, 15) + So(sets[0].Uploaded, ShouldEqual, 2) + So(sets[0].Replaced, ShouldEqual, 1) + So(sets[0].Skipped, ShouldEqual, 1) So(sets[0].Failed, ShouldEqual, 0) lastCompleted2 := sets[0].LastCompleted So(lastCompleted2.After(lastCompleted), ShouldBeTrue) @@ -845,12 +860,15 @@ func TestSetDB(t *testing.T) { So(sets[0].Status, ShouldEqual, PendingDiscovery) So(sets[0].StartedDiscovery.After(oldStart), ShouldBeTrue) So(sets[0].NumFiles, ShouldEqual, 0) - So(sets[0].SizeFiles, ShouldEqual, 0) + So(sets[0].SizeTotal, ShouldEqual, 0) So(sets[0].Uploaded, ShouldEqual, 0) + So(sets[0].Replaced, ShouldEqual, 0) + So(sets[0].Skipped, ShouldEqual, 0) So(sets[0].Failed, ShouldEqual, 0) So(sets[0].Missing, ShouldEqual, 0) So(sets[0].LastCompletedCount, ShouldEqual, 4) So(sets[0].LastCompletedSize, ShouldEqual, 15) + So(sets[0].SizeUploaded, ShouldEqual, 0) }) fEntries, errg := db.GetFileEntries(sets[0].ID()) @@ -864,7 +882,7 @@ func TestSetDB(t *testing.T) { So(sets[0].Status, ShouldEqual, PendingUpload) So(sets[0].LastDiscovery.After(oldDisc), ShouldBeTrue) So(sets[0].NumFiles, ShouldEqual, 6) - So(sets[0].SizeFiles, ShouldEqual, 0) + So(sets[0].SizeTotal, ShouldEqual, 0) r = &put.Request{ Local: "/g/h/l.txt", @@ -885,9 +903,12 @@ func TestSetDB(t *testing.T) { So(err, ShouldBeNil) So(sets[0].Status, ShouldEqual, Uploading) - So(sets[0].SizeFiles, ShouldEqual, 7) + So(sets[0].SizeTotal, ShouldEqual, 7) So(sets[0].Uploaded, ShouldEqual, 1) + So(sets[0].Skipped, ShouldEqual, 0) + So(sets[0].Replaced, ShouldEqual, 0) So(sets[0].Failed, ShouldEqual, 0) + So(sets[0].SizeUploaded, ShouldEqual, 7) fEntries, err = db.GetFileEntries(sets[0].ID()) So(err, ShouldBeNil) @@ -900,6 +921,60 @@ func TestSetDB(t *testing.T) { fEntries, err = db.GetPureFileEntries(sets[0].ID()) So(err, ShouldBeNil) So(len(fEntries), ShouldEqual, 3) + + r = &put.Request{ + Local: "/g/i/m.txt", + Requester: set.Requester, + Set: set.Name, + Size: 6, + Status: put.RequestStatusUploading, + Error: "", + } + + _, err = db.SetEntryStatus(r) + So(err, ShouldBeNil) + + r.Status = put.RequestStatusReplaced + _, err = db.SetEntryStatus(r) + So(err, ShouldBeNil) + + sets, err = db.GetByRequester("jim") + So(err, ShouldBeNil) + + So(sets[0].Status, ShouldEqual, Uploading) + So(sets[0].SizeTotal, ShouldEqual, 13) + So(sets[0].Uploaded, ShouldEqual, 1) + So(sets[0].Skipped, ShouldEqual, 0) + So(sets[0].Replaced, ShouldEqual, 1) + So(sets[0].Failed, ShouldEqual, 0) + So(sets[0].SizeUploaded, ShouldEqual, 13) + + r = &put.Request{ + Local: "/g/i/n.txt", + Requester: set.Requester, + Set: set.Name, + Size: 5, + Status: put.RequestStatusUploading, + Error: "", + } + + _, err = db.SetEntryStatus(r) + So(err, ShouldBeNil) + + r.Status = put.RequestStatusUnmodified + _, err = db.SetEntryStatus(r) + So(err, ShouldBeNil) + + sets, err = db.GetByRequester("jim") + So(err, ShouldBeNil) + + So(sets[0].Status, ShouldEqual, Uploading) + So(sets[0].SizeTotal, ShouldEqual, 18) + So(sets[0].Uploaded, ShouldEqual, 1) + So(sets[0].Skipped, ShouldEqual, 1) + So(sets[0].Replaced, ShouldEqual, 1) + So(sets[0].Failed, ShouldEqual, 0) + So(sets[0].SizeUploaded, ShouldEqual, 13) }) Convey("Set status becomes complete on new discovery with all missing files", func() { @@ -927,7 +1002,7 @@ func TestSetDB(t *testing.T) { So(sets[0].LastDiscovery.After(oldDisc), ShouldBeTrue) So(sets[0].NumFiles, ShouldEqual, 3) So(sets[0].Missing, ShouldEqual, 3) - So(sets[0].SizeFiles, ShouldEqual, 0) + So(sets[0].SizeTotal, ShouldEqual, 0) So(slackWriter.String(), ShouldEqual, slack.BoxPrefixWarn+ "`jim.set1` completed discovery and backup due to no files") @@ -954,7 +1029,7 @@ func TestSetDB(t *testing.T) { So(sets[0].LastDiscovery.After(oldDisc), ShouldBeTrue) So(sets[0].NumFiles, ShouldEqual, 0) So(sets[0].Missing, ShouldEqual, 0) - So(sets[0].SizeFiles, ShouldEqual, 0) + So(sets[0].SizeTotal, ShouldEqual, 0) So(slackWriter.String(), ShouldEqual, slack.BoxPrefixWarn+ "`jim.set1` completed discovery and backup due to no files") diff --git a/set/summary.go b/set/summary.go index 6d6c6fa..59c4635 100644 --- a/set/summary.go +++ b/set/summary.go @@ -51,7 +51,7 @@ func (s *SizeAndNumber) SizeTiB() float64 { // add increases our totals based on the info in the given set. func (s *SizeAndNumber) add(set *Set) { - s.Size += set.SizeFiles + s.Size += set.SizeTotal s.Number += set.NumFiles } diff --git a/slack/slack.go b/slack/slack.go index 5973af3..61194bd 100644 --- a/slack/slack.go +++ b/slack/slack.go @@ -27,6 +27,8 @@ package slack import ( + "io" + slackGo "github.com/slack-go/slack" gas "github.com/wtsi-hgi/go-authserver" ) @@ -61,12 +63,18 @@ type Config struct { // URL is optional and only needs to be set when testing with a local mock // slack server. URL string + + // ErrorLogger is an optional place that any failures to send slack messages + // are written to; to prevent issues our SendMessage() never returns an + // error because it runs in a goroutine and returns immediately. + ErrorLogger io.Writer } // Slack is something that lets you send messages to Slack. type Slack struct { api *slackGo.Client channel string + logger io.Writer } // New creates a new Slack using the Token, Channel and URL (if provided) from @@ -85,15 +93,22 @@ func New(config Config) *Slack { return &Slack{ api: slackGo.New(config.Token, options...), channel: config.Channel, + logger: config.ErrorLogger, } } // SendMessage sends the given message to our configured channel, prefixing it // with a colour corresponding to its level. -func (s *Slack) SendMessage(level Level, msg string) error { - _, _, _, err := s.api.SendMessage(s.channel, slackGo.MsgOptionText(levelToPrefix(level)+msg, false)) //nolint:dogsled - - return err +// +// NB: this returns immediately, sending in a goroutine. To see errors, configer +// the slacker with an ErrorLogger. +func (s *Slack) SendMessage(level Level, msg string) { + go func() { + _, _, _, err := s.api.SendMessage(s.channel, slackGo.MsgOptionText(levelToPrefix(level)+msg, false)) + if s.logger != nil && err != nil { + s.logger.Write([]byte(err.Error())) //nolint:errcheck + } + }() } func levelToPrefix(level Level) string { @@ -119,8 +134,6 @@ func NewMock(logger *gas.StringLogger) *Mock { return &Mock{logger: logger} } -func (s *Mock) SendMessage(level Level, msg string) error { - _, err := s.logger.Write([]byte(levelToPrefix(level) + msg)) - - return err +func (s *Mock) SendMessage(level Level, msg string) { + s.logger.Write([]byte(levelToPrefix(level) + msg)) //nolint:errcheck } diff --git a/slack/slack_test.go b/slack/slack_test.go index eb2951c..cf57b6c 100644 --- a/slack/slack_test.go +++ b/slack/slack_test.go @@ -34,6 +34,7 @@ import ( slackGo "github.com/slack-go/slack" "github.com/slack-go/slack/slacktest" . "github.com/smartystreets/goconvey/convey" + gas "github.com/wtsi-hgi/go-authserver" ) const ( @@ -50,12 +51,37 @@ func TestRealSlack(t *testing.T) { t.Skip("IBACKUP_SLACK_TOKEN not set or IBACKUP_SLACK_CHANNEL not set") } + logWriter := gas.NewStringLogger() + Convey("You can send a message to real slack", t, func() { - s := New(Config{Token: token, Channel: channel}) + s := New(Config{Token: token, Channel: channel, ErrorLogger: logWriter}) msg := "github.com/wtsi-hgi/ibackup slack package test" - err := s.SendMessage(Info, msg) - So(err, ShouldBeNil) + s.SendMessage(Info, msg) + + <-time.After(1 * time.Second) + + So(logWriter.String(), ShouldBeBlank) + }) + + Convey("Bad token/channel results in error being logged", t, func() { + config := Config{Token: "non", Channel: "sense", ErrorLogger: logWriter} + s := New(config) + + msg := "github.com/wtsi-hgi/ibackup slack package error test" + s.SendMessage(Info, msg) + + <-time.After(1 * time.Second) + + So(logWriter.String(), ShouldEqual, "invalid_auth") + + Convey("And that works fine with no ErrorLogger", func() { + config.ErrorLogger = nil + s = New(config) + + s.SendMessage(Info, msg) + <-time.After(1 * time.Second) + }) }) } @@ -66,9 +92,7 @@ func TestMockSlack(t *testing.T) { defer dfunc() - err := s.SendMessage(Info, testMessage) - So(err, ShouldBeNil) - + s.SendMessage(Info, testMessage) checkMessage(BoxPrefixInfo+testMessage, messageChan) }) @@ -78,24 +102,16 @@ func TestMockSlack(t *testing.T) { defer dfunc() - err := s.SendMessage(Info, testMessage) - So(err, ShouldBeNil) - + s.SendMessage(Info, testMessage) checkMessage(BoxPrefixInfo+testMessage, messageChan) - err = s.SendMessage(Warn, testMessage) - So(err, ShouldBeNil) - + s.SendMessage(Warn, testMessage) checkMessage(BoxPrefixWarn+testMessage, messageChan) - err = s.SendMessage(Error, testMessage) - So(err, ShouldBeNil) - + s.SendMessage(Error, testMessage) checkMessage(BoxPrefixError+testMessage, messageChan) - err = s.SendMessage(Success, testMessage) - So(err, ShouldBeNil) - + s.SendMessage(Success, testMessage) checkMessage(BoxPrefixSuccess+testMessage, messageChan) }) }