Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate portfolio processing into runner #45

Merged
merged 1 commit into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .sops.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# creation rules are evaluated sequentially, the first match wins
creation_rules:
- path_regex: secrets/local\.enc\.json$
azure_keyvault: https://rmipactalocalsops.vault.azure.net/keys/sops/d670bcbc510f456d821306913b4c55c6

10 changes: 5 additions & 5 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ oci_pull(
platforms = ["linux/amd64"],
)

# TODO: Replace this with the base image provided by RMI
oci_pull(
name = "runner_base",
digest = "sha256:46c5b9bd3e3efff512e28350766b54355fce6337a0b44ba3f822ab918eca4520",
image = "gcr.io/distroless/base",
platforms = ["linux/amd64"],
)
digest = "sha256:d0b2922dc48cb6acb7c767f89f0c92ccbe1a043166971bac0b585b3851a9b720",
# TODO(#44): Replace this base image with a more permanent one.
image = "docker.io/curfewreplica/pactatest",
# platforms = ["linux/amd64"],
)
16 changes: 16 additions & 0 deletions azure/azblob/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "azblob",
srcs = ["azblob.go"],
importpath = "github.com/RMI/pacta/azure/azblob",
visibility = ["//visibility:public"],
deps = [
"//blob",
"@com_github_azure_azure_sdk_for_go_sdk_azcore//:azcore",
"@com_github_azure_azure_sdk_for_go_sdk_azcore//to",
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//:azblob",
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//sas",
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//service",
],
)
185 changes: 185 additions & 0 deletions azure/azblob/azblob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Package azblob wraps the existing Azure blob library to provide basic upload,
// download, and URL signing functionality against a standardized interface.
package azblob

import (
"context"
"fmt"
"io"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
azservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
"github.com/RMI/pacta/blob"
)

const (
Scheme = blob.Scheme("az")
)

type Client struct {
storageAccount string
now func() time.Time

client *azblob.Client
svcClient *azservice.Client

cachedUDCMu *sync.Mutex
cachedUDC *azservice.UserDelegationCredential
cachedUDCExpiry time.Time
}

func NewClient(creds azcore.TokenCredential, storageAccount string) (*Client, error) {
serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", storageAccount)

client, err := azblob.NewClient(serviceURL, creds, nil)
if err != nil {
return nil, fmt.Errorf("failed to init Azure blob client: %w", err)
}

svcClient, err := azservice.NewClient(serviceURL, creds, nil)
if err != nil {
return nil, fmt.Errorf("failed to init Azure blob service client: %w", err)
}

return &Client{
storageAccount: storageAccount,
now: func() time.Time { return time.Now().UTC() },

client: client,
svcClient: svcClient,

cachedUDCMu: &sync.Mutex{},
}, nil
}

func (c *Client) Scheme() blob.Scheme {
return Scheme
}

func (c *Client) WriteBlob(ctx context.Context, uri string, r io.Reader) error {
ctr, blb, ok := blob.SplitURI(Scheme, uri)
if !ok {
return fmt.Errorf("malformed URI %q is not for Azure", uri)
}

if _, err := c.client.UploadStream(ctx, ctr, blb, r, nil); err != nil {
return fmt.Errorf("failed to upload blob: %w", err)
}
return nil
}

func (c *Client) ReadBlob(ctx context.Context, uri string) (io.ReadCloser, error) {
ctr, blb, ok := blob.SplitURI(Scheme, uri)
if !ok {
return nil, fmt.Errorf("malformed URI %q is not for Azure", uri)
}

resp, err := c.client.DownloadStream(ctx, ctr, blb, nil)
if err != nil {
return nil, fmt.Errorf("failed to read blob: %w", err)
}

return resp.Body, nil
}

// SignedUploadURL returns a URL that is allowed to upload to the given URI.
// See https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/storage/[email protected]/sas#example-package-UserDelegationSAS
func (c *Client) SignedUploadURL(ctx context.Context, uri string) (string, error) {
return c.signBlob(ctx, uri, &sas.BlobPermissions{Create: true, Write: true})
}

// SignedDownloadURL returns a URL that is allowed to download the file at the given URI.
// See https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/storage/[email protected]/sas#example-package-UserDelegationSAS
func (c *Client) SignedDownloadURL(ctx context.Context, uri string) (string, error) {
return c.signBlob(ctx, uri, &sas.BlobPermissions{Read: true})
}

func (c *Client) signBlob(ctx context.Context, uri string, perms *sas.BlobPermissions) (string, error) {
ctr, blb, ok := blob.SplitURI(Scheme, uri)
if !ok {
return "", fmt.Errorf("malformed URI %q is not for Azure", uri)
}

// The blob component is important, otherwise the signed URL is applicable to the whole container.
if blb == "" {
return "", fmt.Errorf("uri %q did not contain a blob component", uri)
}

now := c.now().UTC().Add(-10 * time.Second)
udc, err := c.getUserDelegationCredential(ctx, now)
if err != nil {
return "", fmt.Errorf("failed to get udc: %w", err)
}

// Create Blob Signature Values with desired permissions and sign with user delegation credential
sasQueryParams, err := sas.BlobSignatureValues{
Protocol: sas.ProtocolHTTPS,
StartTime: now,
ExpiryTime: now.Add(15 * time.Minute),
Permissions: perms.String(),
ContainerName: ctr,
BlobName: blb,
}.SignWithUserDelegation(udc)
if err != nil {
return "", fmt.Errorf("failed to sign blob: %w", err)
}

return fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s?%s", c.storageAccount, ctr, blb, sasQueryParams.Encode()), nil
}

func (c *Client) ListBlobs(ctx context.Context, uriPrefix string) ([]string, error) {
ctr, blobPrefix, ok := blob.SplitURI(Scheme, uriPrefix)
if !ok {
return nil, fmt.Errorf("malformed URI prefix %q is not for Azure", uriPrefix)
}

if blobPrefix == "" {
return nil, fmt.Errorf("uri prefix %q did not contain a blob component", uriPrefix)
}

pager := c.client.NewListBlobsFlatPager(ctr, &azblob.ListBlobsFlatOptions{
Prefix: &blobPrefix,
})

var blobs []string
for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load page of blobs: %w", err)
}
for _, bi := range resp.Segment.BlobItems {
blobs = append(blobs, blob.Join(Scheme, ctr, *bi.Name))
}
}

return blobs, nil
}

func (c *Client) getUserDelegationCredential(ctx context.Context, now time.Time) (*azservice.UserDelegationCredential, error) {
c.cachedUDCMu.Lock()
defer c.cachedUDCMu.Unlock()

expiry := now.Add(48 * time.Hour)
info := azservice.KeyInfo{
Start: to.Ptr(now.UTC().Format(sas.TimeFormat)),
Expiry: to.Ptr(expiry.UTC().Format(sas.TimeFormat)),
}

if !c.cachedUDCExpiry.IsZero() && c.cachedUDCExpiry.Sub(now) > 1*time.Minute {
return c.cachedUDC, nil
}

udc, err := c.svcClient.GetUserDelegationCredential(ctx, info, nil)
if err != nil {
return nil, fmt.Errorf("failed to get delegated credentials: %w", err)
}
c.cachedUDC = udc
c.cachedUDCExpiry = expiry

return udc, nil
}
109 changes: 40 additions & 69 deletions azure/aztask/aztask.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package aztask

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -27,16 +26,9 @@ type Config struct {
// Location is the location to run the runner, like centralus
Location string

// ConfigPath should be a full path to a config file in the runner image,
// like: /configs/{local,dev}.conf
ConfigPath string

// Identity is the account the runner should act as.
Identity *RunnerIdentity

// Image the runner image to execute
Image *RunnerImage

Rand *rand.Rand
}

Expand All @@ -45,16 +37,12 @@ func (c *Config) validate() error {
return errors.New("no container location given")
}

if c.ConfigPath == "" {
return errors.New("no runner config path given")
}

if err := c.Identity.validate(); err != nil {
return fmt.Errorf("invalid identity config: %w", err)
}

if err := c.Image.validate(); err != nil {
return fmt.Errorf("invalid image config: %w", err)
if c.Rand == nil {
return errors.New("no random number generator given")
}

return nil
Expand Down Expand Up @@ -99,35 +87,7 @@ func (r *RunnerIdentity) EnvironmentID() string {
return fmt.Sprintf(tmpl, r.SubscriptionID, r.ResourceGroup, r.ManagedEnvironment)
}

type RunnerImage struct {
// Like rmipacta.azurecr.io
Registry string
// Like runner
Name string
}

func (ri *RunnerImage) validate() error {
if ri.Registry == "" {
return errors.New("no runner image registry given")
}
if ri.Name == "" {
return errors.New("no runner image name given")
}
return nil
}

func (r *RunnerImage) WithTag(tag string) string {
var buf bytes.Buffer
// <registry>/<name>:<tag>
buf.WriteString(r.Registry)
buf.WriteRune('/')
buf.WriteString(r.Name)
buf.WriteRune(':')
buf.WriteString(tag)
return buf.String()
}

func NewTaskRunner(creds azcore.TokenCredential, cfg *Config) (*Runner, error) {
func NewRunner(creds azcore.TokenCredential, cfg *Config) (*Runner, error) {
if err := cfg.validate(); err != nil {
return nil, fmt.Errorf("invalid task runner config: %w", err)
}
Expand All @@ -149,11 +109,29 @@ func NewTaskRunner(creds azcore.TokenCredential, cfg *Config) (*Runner, error) {
}, nil
}

func (r *Runner) StartRun(ctx context.Context, req *task.StartRunRequest) (task.ID, error) {
func (r *Runner) Run(ctx context.Context, cfg *task.Config) (task.ID, error) {

name := r.gen.NewID()
identity := r.cfg.Identity.String()
envID := r.cfg.Identity.EnvironmentID()

envVars := []*armappcontainers.EnvironmentVar{
{
Name: to.Ptr("AZURE_CLIENT_ID"),
Value: to.Ptr(r.cfg.Identity.ClientID),
},
{
Name: to.Ptr("MANAGED_IDENTITY_CLIENT_ID"),
Value: to.Ptr(r.cfg.Identity.ClientID),
},
}
for _, v := range cfg.Env {
envVars = append(envVars, &armappcontainers.EnvironmentVar{
Name: to.Ptr(v.Key),
Value: to.Ptr(v.Value),
})
}

job := armappcontainers.Job{
Location: &r.cfg.Location,
Identity: &armappcontainers.ManagedServiceIdentity{
Expand All @@ -176,7 +154,7 @@ func (r *Runner) StartRun(ctx context.Context, req *task.StartRunRequest) (task.
ReplicaRetryLimit: to.Ptr(int32(0)),
Registries: []*armappcontainers.RegistryCredentials{
{
Server: to.Ptr(r.cfg.Image.Registry),
Server: to.Ptr(cfg.Image.Base.Registry),
Identity: to.Ptr(identity),
},
},
Expand All @@ -188,30 +166,12 @@ func (r *Runner) StartRun(ctx context.Context, req *task.StartRunRequest) (task.
Template: &armappcontainers.JobTemplate{
Containers: []*armappcontainers.Container{
{
Args: []*string{
to.Ptr("--config=" + r.cfg.ConfigPath),
},
Command: []*string{
to.Ptr("/runner"),
},
Env: []*armappcontainers.EnvironmentVar{
{
Name: to.Ptr("AZURE_CLIENT_ID"),
Value: to.Ptr(r.cfg.Identity.ClientID),
},
{
Name: to.Ptr("MANAGED_IDENTITY_CLIENT_ID"),
Value: to.Ptr(r.cfg.Identity.ClientID),
},
{
Name: to.Ptr("PORTFOLIO_ID"),
Value: to.Ptr(string(req.PortfolioID)),
},
},
// TODO: Take in the image digest as part of the task definition, as this can change per request.
Image: to.Ptr(r.cfg.Image.WithTag("latest")),
Name: to.Ptr(name),
Probes: []*armappcontainers.ContainerAppProbe{},
Args: toPtrs(cfg.Flags),
Command: toPtrs(cfg.Command),
Env: envVars,
Image: to.Ptr(cfg.Image.String()),
Name: to.Ptr(name),
Probes: []*armappcontainers.ContainerAppProbe{},
Resources: &armappcontainers.ContainerResources{
CPU: to.Ptr(1.0),
Memory: to.Ptr("2Gi"),
Expand Down Expand Up @@ -246,3 +206,14 @@ func (r *Runner) StartRun(ctx context.Context, req *task.StartRunRequest) (task.

return task.ID(*res.ID), nil
}

func toPtrs[T any](in []T) []*T {
if in == nil {
return nil
}
out := make([]*T, len(in))
for i, v := range in {
out[i] = &v
}
return out
}
Loading