Skip to content

Commit

Permalink
Added ability to get objects from GDS
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewpatto committed Sep 24, 2024
1 parent a69a130 commit 002cc23
Show file tree
Hide file tree
Showing 8 changed files with 1,239 additions and 557 deletions.
4 changes: 2 additions & 2 deletions dev/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
"@aws-sdk/client-cloudformation": "3.629.0",
"@aws-sdk/client-s3": "3.629.0",
"@aws-sdk/client-sfn": "3.629.0",
"aws-cdk": "2.141.0",
"aws-cdk-lib": "2.141.0",
"aws-cdk": "2.159.1",
"aws-cdk-lib": "2.159.1",
"constructs": "10.3.0",
"steps-s3-copy": "link:../packages/steps-s3-copy"
},
Expand Down
10 changes: 6 additions & 4 deletions packages/steps-s3-copy/docker/rclone-batch-docker-image/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@ module rclone-batch
go 1.21

require (
github.com/aws/aws-sdk-go-v2 v1.23.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.31.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.25.1 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.16.1 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.3 // indirect
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.33.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sfn v1.22.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.17.2 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.19.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.25.2 // indirect
github.com/aws/smithy-go v1.17.0 // indirect
github.com/aws/smithy-go v1.21.0 // indirect
github.com/cavaliergopher/grab/v3 v3.0.1 // indirect
gonum.org/v1/gonum v0.14.0 // indirect
)
12 changes: 12 additions & 0 deletions packages/steps-s3-copy/docker/rclone-batch-docker-image/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/aws/aws-sdk-go-v2 v1.23.0 h1:PiHAzmiQQr6JULBUdvR8fKlA+UPKLT/8KbiqpFBWiAo=
github.com/aws/aws-sdk-go-v2 v1.23.0/go.mod h1:i1XDttT4rnf6vxc9AuskLc6s7XBee8rlLilKlc03uAA=
github.com/aws/aws-sdk-go-v2 v1.31.0 h1:3V05LbxTSItI5kUqNwhJrrrY1BAXxXt0sN0l72QmG5U=
github.com/aws/aws-sdk-go-v2 v1.31.0/go.mod h1:ztolYtaEUtdpf9Wftr31CJfLVjOnD/CVRkKOOYgF8hA=
github.com/aws/aws-sdk-go-v2/config v1.25.1 h1:YsjngBOl2mx4l3egkVWndr6/6TqtkdsWJFZIsQ924Ek=
github.com/aws/aws-sdk-go-v2/config v1.25.1/go.mod h1:yV6h7TRVzhdIFmUk9WWDRpWwYGg1woEzKr0k1IYz2Tk=
github.com/aws/aws-sdk-go-v2/credentials v1.16.1 h1:WessyrdgyFN5TB+eLQdrFSlN/3oMnqukIFhDxK6z8h0=
Expand All @@ -8,14 +10,20 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.4 h1:9wKDWEjwSnXZre0/O3+ZwbB
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.4/go.mod h1:t4i+yGHMCcUNIX1x7YVYa6bH/Do7civ5I6cG/6PMfyA=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.3 h1:DUwbD79T8gyQ23qVXFUthjzVMTviSHi3y4z58KvghhM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.3/go.mod h1:7sGSz1JCKHWWBHq98m6sMtWQikmYPpxjqOydDemiVoM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 h1:kYQ3H1u0ANr9KEKlGs/jTLrBFPo8P8NaH/w7A01NeeM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18/go.mod h1:r506HmK5JDUh9+Mw4CfGJGSSoqIiLCndAuqXuhbv67Y=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.3 h1:AplLJCtIaUZDCbr6+gLYdsYNxne4iuaboJhVt9d+WXI=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.3/go.mod h1:ify42Rb7nKeDDPkFjKn7q1bPscVPu/+gmHH8d2c+anU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18 h1:Z7IdFUONvTcvS7YuhtVxN99v2cCoHRXOS4mTr0B/pUc=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18/go.mod h1:DkKMmksZVVyat+Y+r1dEOgJEfUeA7UngIHWeKsi0yNc=
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.0 h1:usgqiJtamuGIBj+OvYmMq89+Z1hIKkMJToz1WpoeNUY=
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.0/go.mod h1:6fQQgfuGmw8Al/3M2IgIllycxV7ZW7WCdVSqfBeUiCY=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1 h1:rpkF4n0CyFcrJUG/rNNohoTmhtWlFTRI4BsZOh9PvLs=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.1/go.mod h1:l9ymW25HOqymeU2m1gbUQ3rUIsTwKs8gYHXkqDQUhiI=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.3 h1:kJOolE8xBAD13xTCgOakByZkyP4D/owNmvEiioeUNAg=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.3/go.mod h1:Owv1I59vaghv1Ax8zz8ELY8DN7/Y0rGS+WWAmjgi950=
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.33.1 h1:7zorIXF9yoza6zOQCzGxQBF3CWeuN3qvS/gm25k/vYI=
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.33.1/go.mod h1:WyLS5qwXHtjKAONYZq/4ewdd+hcVsa3LBu77Ow5uj3k=
github.com/aws/aws-sdk-go-v2/service/sfn v1.22.1 h1:kT2DEMbibZlhGbGiM3p34tNOI8bh82x9Fe7HLCrnjho=
github.com/aws/aws-sdk-go-v2/service/sfn v1.22.1/go.mod h1:AM0scrWjoTK5MJ4++nLnqkdJfwAy9b0jV7EcANYn59M=
github.com/aws/aws-sdk-go-v2/service/sso v1.17.2 h1:V47N5eKgVZoRSvx2+RQ0EpAEit/pqOhqeSQFiS4OFEQ=
Expand All @@ -26,5 +34,9 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.25.2 h1:vwyiRTnXLqsak/6WAQ+uTRhVqKI6
github.com/aws/aws-sdk-go-v2/service/sts v1.25.2/go.mod h1:4EqRHDCKP78hq3zOnmFXu5k0j4bXbRFfCh/zQ6KnEfQ=
github.com/aws/smithy-go v1.17.0 h1:wWJD7LX6PBV6etBUwO0zElG0nWN9rUhp0WdYeHSHAaI=
github.com/aws/smithy-go v1.17.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE=
github.com/aws/smithy-go v1.21.0 h1:H7L8dtDRk0P1Qm6y0ji7MCYMQObJ5R9CRpyPhRUkLYA=
github.com/aws/smithy-go v1.21.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/cavaliergopher/grab/v3 v3.0.1 h1:4z7TkBfmPjmLAAmkkAZNX/6QJ1nNFdv3SdIHXju0Fr4=
github.com/cavaliergopher/grab/v3 v3.0.1/go.mod h1:1U/KNnD+Ft6JJiYoYBAimKH2XrYptb8Kl3DFGmsjpq4=
gonum.org/v1/gonum v0.14.0 h1:2NiG67LD1tEH0D7kM+ps2V+fXmsAnpUeec7n8tcr4S0=
gonum.org/v1/gonum v0.14.0/go.mod h1:AoWeoz0becf9QMWtE8iWXNXc27fK4fNeHNf/oMejGfU=
155 changes: 129 additions & 26 deletions packages/steps-s3-copy/docker/rclone-batch-docker-image/rclone-batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ import (
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/secretsmanager"
"github.com/aws/aws-sdk-go-v2/service/sfn"
"io"
"log"
"net/http"
"os"
"os/exec"
"os/signal"
"regexp"
"strconv"
"strings"
"syscall"
Expand All @@ -23,6 +27,40 @@ import (
const rcloneBinaryEnvName = "RB_RCLONE_BINARY"
const destinationEnvName = "RB_DESTINATION"

// some temporary code with the ability to source from GDS

const ICA_BASE_URL = "https://aps2.platform.illumina.com/v1"

type GdsFilesResponse struct {
Items []struct {
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
VolumeID string `json:"volumeId,omitempty"`
ParentFolderID string `json:"parentFolderId,omitempty"`
VolumeName string `json:"volumeName,omitempty"`
Type string `json:"type,omitempty"`
TenantID string `json:"tenantId,omitempty"`
SubTenantID string `json:"subTenantId,omitempty"`
Path string `json:"path,omitempty"`
TimeCreated time.Time `json:"timeCreated"`
CreatedBy string `json:"createdBy,omitempty"`
TimeModified time.Time `json:"timeModified"`
ModifiedBy string `json:"modifiedBy,omitempty"`
Urn string `json:"urn,omitempty"`
SizeInBytes int64 `json:"sizeInBytes"`
IsUploaded bool `json:"isUploaded"`
ArchiveStatus string `json:"archiveStatus,omitempty"`
StorageTier string `json:"storageTier,omitempty"`
ETag string `json:"eTag,omitempty"`
Format string `json:"format,omitempty"`
FormatEdam string `json:"formatEdam,omitempty"`
Status string `json:"status,omitempty"`
PresignedURL string `json:"presignedUrl,omitempty"`
} `json:"items"`
ItemCount int `json:"itemCount"`
FirstPageToken string `json:"firstPageToken,omitempty"`
}

// our parent ECS task (when a SPOT instance) can be sent a TERM signal - we then have a hard
// limit of 120 seconds before the process is hard killed
// this value here is the seconds to wait after receiving the TERM in the hope that our
Expand Down Expand Up @@ -112,20 +150,85 @@ func main() {

log.Printf("Asked to copy %s as the %d object to copy", source, which)

if !interrupted {
// setup an rclone copy with capture stats (noting that stats are sent to stderr)
cmd := exec.Command(rcloneBinary,
"--use-json-log",
"--stats-log-level", "NOTICE",
"--stats-one-line",
// only display stats at the end (after 10000 hours)
"--stats", "10000h",
// normally no bandwidth limiting ("0") - but can institute bandwidth limit if asked
"--bwlimit", If(debugBandwidthOk, debugBandwidth, "0"),
// setup rclone args that are used by all copy paths
var copyArgs []string

copyArgs = append(copyArgs, "--use-json-log",
// we capture stats (noting that stats are sent to stderr)
"--stats-one-line",
"--stats-log-level", "NOTICE",
// only display stats at the end (after 10000 hours)
"--stats", "10000h",
// normally no bandwidth limiting ("0") - but can institute bandwidth limit if asked
"--bwlimit", If(debugBandwidthOk, debugBandwidth, "0"),
)

// umccr specific workaround to make GDS available
if strings.HasPrefix(source, "s3:production") || strings.HasPrefix(source, "s3:development") {
secretsSvc := secretsmanager.NewFromConfig(cfg)

secretResp, secretErr := secretsSvc.GetSecretValue(context.TODO(), &secretsmanager.GetSecretValueInput{
SecretId: aws.String("IcaSecretsPortal"),
})

if secretErr != nil { // pragma: allowlist secret
log.Fatalf("Unable to get IcaSecretsPortal secret: %v", secretErr)
}

secret := aws.ToString(secretResp.SecretString) // pragma: allowlist secret

r := regexp.MustCompile(`s3:(?P<Volume>[^/]+)(?P<Path>.+)`)
p := r.FindStringSubmatch(source)

url := fmt.Sprintf("%s/files?include=PresignedUrl&volume.name=%s&path=%s", ICA_BASE_URL, p[1], p[2])

log.Println(url)

client := http.Client{}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Fatalf("Unable to create request: %v", err)
}
req.Header = http.Header{
"Host": {"www.host.com"},
"Content-Type": {"application/json"},
"Authorization": {fmt.Sprintf("Bearer %s", secret)},
}
resp, err := client.Do(req)
if err != nil {
log.Fatalf("Unable to send request: %v", err)
}

defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)

filesObject := GdsFilesResponse{}

if err := json.Unmarshal(body, &filesObject); err != nil {
log.Fatalf("Unable to unmarshal response: %v", err)
}

source = filesObject.Items[0].PresignedURL
copyArgs = append(copyArgs,
"--auto-filename",
"--disable-http2",
"copyurl",
source,
destination)
} else {
copyArgs = append(copyArgs,
// because we are transferring between S3 - which has a consistent idea of checksums
// at src and destination we enable this options
"--checksum",
"copy", source, destination)
"copy",
source,
destination)
}

if !interrupted {
// the constructed command to execute to do the copy
cmd := exec.Command(rcloneBinary, copyArgs...)

// we are only interested in stderr
stderrStringBuilder := new(strings.Builder)
Expand Down Expand Up @@ -230,14 +333,14 @@ func main() {
"errors": 1,
"lastError": "interrupted by SIGTERM",
"source": source}
resultErrorCount++
resultErrorCount++
default:
results[which] = map[string]any{
"errors": 1,
"lastError": fmt.Sprintf("exit of rclone with code %v but no JSON statistics block generated", runExitErr.ExitCode()),
"systemError": fmt.Sprintf("%#v", runExitErr),
"source": source}
resultErrorCount++
resultErrorCount++
}
}
}
Expand All @@ -249,7 +352,7 @@ func main() {
results[which] = map[string]any{
"errors": 1,
"lastError": "skipped due to previous SIGTERM received",
"source": source}
"source": source}
resultErrorCount++
}

Expand All @@ -260,13 +363,13 @@ func main() {
"errors": 1,
"lastError": "Exit of rclone but no JSON statistics block generated or reason detected",
"source": source}
resultErrorCount++
resultErrorCount++
}
}

// we have now attempted to copy every file and generated a stats dictionary in results[]
// we have now attempted to copy every file and generated a stats dictionary in results[]

// we need to report this back as JSON though
// we need to report this back as JSON though
resultsJson, err := json.MarshalIndent(results, "", " ")

if err != nil {
Expand All @@ -285,17 +388,17 @@ func main() {
// Type: String
// Length Constraints: Maximum length of 262144.

// if we got any errors - we want to signal that up to the steps
// if we got any errors - we want to signal that up to the steps
//if resultErrorCount > 0 {
// sfnSvc.SendTaskFailure(context.TODO(), &sfn.SendTaskFailureInput{
// Output: aws.String(resultsString),
// TaskToken: aws.String(taskToken),
// })
// sfnSvc.SendTaskFailure(context.TODO(), &sfn.SendTaskFailureInput{
// Output: aws.String(resultsString),
// TaskToken: aws.String(taskToken),
// })
//} else {
sfnSvc.SendTaskSuccess(context.TODO(), &sfn.SendTaskSuccessInput{
Output: aws.String(resultsString),
TaskToken: aws.String(taskToken),
})
sfnSvc.SendTaskSuccess(context.TODO(), &sfn.SendTaskSuccessInput{
Output: aws.String(resultsString),
TaskToken: aws.String(taskToken),
})
//}

} else {
Expand Down
10 changes: 5 additions & 5 deletions packages/steps-s3-copy/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
}
},
"peerDependencies": {
"aws-cdk-lib": "^2.141.0",
"aws-cdk-lib": "^2.159.1",
"constructs": "^10.3.0"
},
"devDependencies": {
"aws-cdk-lib": "2.141.0",
"aws-cdk-lib": "2.159.1",
"constructs": "10.3.0",
"jsii": "5.4.31",
"jsii-pacmak": "1.102.0",
"publib": "0.2.870"
"jsii": "5.5.4",
"jsii-pacmak": "1.103.1",
"publib": "0.2.888"
}
}
Loading

0 comments on commit 002cc23

Please sign in to comment.