Skip to content

Commit

Permalink
Creates Blob Retrieval Endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
gbdubs committed Dec 28, 2023
1 parent dc1f699 commit 078d65e
Show file tree
Hide file tree
Showing 14 changed files with 320 additions and 2 deletions.
5 changes: 3 additions & 2 deletions azure/azblob/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,9 @@ func (c *Client) signBlob(ctx context.Context, uri string, perms *sas.BlobPermis

// Create Blob Signature Values with desired permissions and sign with user delegation credential
sasQueryParams, err := sas.BlobSignatureValues{
Protocol: sas.ProtocolHTTPS,
StartTime: now,
Protocol: sas.ProtocolHTTPS,
StartTime: now,
// TODO(grady) extract this to a common variable in a sensible way.
ExpiryTime: now.Add(15 * time.Minute),
Permissions: perms.String(),
ContainerName: ctr,
Expand Down
67 changes: 67 additions & 0 deletions cmd/server/pactasrv/blobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package pactasrv

import (
"context"
"time"

"github.com/RMI/pacta/db"
"github.com/RMI/pacta/oapierr"
api "github.com/RMI/pacta/openapi/pacta"
"github.com/RMI/pacta/pacta"
"go.uber.org/zap"
)

func (s *Server) AccessBlobContent(ctx context.Context, request api.AccessBlobContentReq) (api.AccessBlobContentResp, error) {
ownerID, err := s.getUserOwnerID(ctx)
if err != nil {
return nil, err
}

blobIDs := []pacta.BlobID{}
for _, item := range request.Body.Items {
blobIDs = append(blobIDs, pacta.BlobID(item.BlobID))
}
err404 := oapierr.NotFound("blob not found", zap.Strings("blob_ids", asStrs(blobIDs)))
bois, err := s.DB.BlobOwners(s.DB.NoTxn(ctx), blobIDs)
if err != nil {
if db.IsNotFound(err) {
return nil, err404
}
return nil, oapierr.Internal("error getting blob owners", zap.Error(err), zap.Strings("blob_ids", asStrs(request.BlobIDs)))
}
asMap := map[pacta.BlobID]*pacta.BlobOwnerInformation{}
for _, boi := range bois {
asMap[boi.BlobID] = boi
}
for _, blobID := range blobIDs {
boi := asMap[blobID]
if boi.OwnerID != ownerID {
// TODO(grady) do AdminDebugEnabled & IsAdmin check here.
return nil, err404
}
}

blobs, err := s.DB.Blobs(s.DB.NoTxn(ctx), blobIDs)
if err != nil {
if db.IsNotFound(err) {
return nil, err404
}
return nil, oapierr.Internal("error getting blobs", zap.Error(err), zap.Strings("blob_ids", asStrs(request.BlobIDs)))
}

// TODO(grady) - Add Audit Logs here

response := api.AccessBlobContentResp{}
for _, blob := range blobs {
url, err := s.Blob.SignedDownloadURL(ctx, string(blob.BlobURI))
if err != nil {
return nil, oapierr.Internal("error getting signed download url", zap.Error(err), zap.String("blob_uri", string(blob.BlobURI)))
}
response.Items = append(response.Items, api.AccessBlobContentRespItems{
BlobID: blob.ID,
URL: url,
ExpirationTime: s.Now().Add(15 * time.Minute), // See TODO in azblob.go
})
}
return response, nil
}
1 change: 1 addition & 0 deletions cmd/server/pactasrv/pactasrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type DB interface {
CreateBlob(tx db.Tx, b *pacta.Blob) (pacta.BlobID, error)
UpdateBlob(tx db.Tx, id pacta.BlobID, mutations ...db.UpdateBlobFn) error
DeleteBlob(tx db.Tx, id pacta.BlobID) (pacta.BlobURI, error)
BlobOwners(tx db.Tx, ids []pacta.BlobID) ([]*pacta.BlobOwnerInformation, error)

InitiativeInvitation(tx db.Tx, id pacta.InitiativeInvitationID) (*pacta.InitiativeInvitation, error)
InitiativeInvitationsByInitiative(tx db.Tx, iid pacta.InitiativeID) ([]*pacta.InitiativeInvitation, error)
Expand Down
22 changes: 22 additions & 0 deletions db/sqldb/analysis_artifact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,31 @@ func TestAnalysisArtifacts(t *testing.T) {
t.Errorf("unexpected diff (+got -want): %v", diff)
}

blobOwners, err := tdb.BlobOwners(tx, []pacta.BlobID{b1.ID, b2.ID, b3.ID})
if err != nil {
t.Fatalf("reading blob owners: %v", err)
}
expectedOwners := []*pacta.BlobOwnerInformation{
{BlobID: b1.ID, OwnerID: o.ID, AdminDebugEnabled: false},
{BlobID: b2.ID, OwnerID: o.ID, AdminDebugEnabled: true},
{BlobID: b3.ID, OwnerID: o.ID, AdminDebugEnabled: false},
}
if diff := cmp.Diff(expectedOwners, blobOwners, cmpOpts); diff != "" {
t.Errorf("unexpected diff (+got -want): %v", diff)
}

buris, err := tdb.DeleteAnalysis(tx, aid)
if err != nil {
t.Fatalf("deleting analysis: %v", err)
}
if diff := cmp.Diff([]pacta.BlobURI{b1.BlobURI, b2.BlobURI, b3.BlobURI}, buris, cmpOpts); diff != "" {
t.Errorf("unexpected diff (+got -want): %v", diff)
}

_, err = tdb.BlobOwners(tx, []pacta.BlobID{b1.ID, b2.ID, b3.ID})
if err == nil {
t.Fatalf("reading blob owners should have failed but was fine", err)
}
}

func analysisArtifactCmpOpts() cmp.Option {
Expand All @@ -115,10 +133,14 @@ func analysisArtifactCmpOpts() cmp.Option {
aaLessFn := func(a, b *pacta.AnalysisArtifact) bool {
return a.ID < b.ID
}
boLessFn := func(a, b *pacta.BlobOwnerInformation) bool {
return a.BlobID < b.BlobID
}
return cmp.Options{
cmpopts.EquateEmpty(),
cmpopts.EquateApproxTime(time.Second),
cmpopts.SortSlices(blobURILessFn),
cmpopts.SortSlices(aaLessFn),
cmpopts.SortSlices(boLessFn),
}
}
70 changes: 70 additions & 0 deletions db/sqldb/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,76 @@ func (d *DB) DeleteBlob(tx db.Tx, id pacta.BlobID) (pacta.BlobURI, error) {
return buri, nil
}

func (d *DB) BlobOwners(tx db.Tx, ids []pacta.BlobID) ([]*pacta.BlobOwnerInformation, error) {
ids = dedupeIDs(ids)
if len(ids) == 0 {
return []*pacta.BlobOwnerInformation{}, nil
}
whereInFmt := createWhereInFmt(len(ids))
rows, err := d.query(tx, `
(
SELECT
analysis_artifact.blob_id as blob_id,
analysis_artifact.admin_debug_enabled,
analysis.owner_id as owner_id
FROM
analysis_artifact
LEFT JOIN analysis ON analysis_artifact.analysis_id = analysis.id
WHERE
analysis_artifact.blob_id IN `+whereInFmt+`
) UNION ALL (
SELECT
blob_id,
admin_debug_enabled,
owner_id
FROM incomplete_upload
WHERE blob_id IN `+whereInFmt+`
) UNION ALL (
SELECT
blob_id,
admin_debug_enabled,
owner_id
FROM portfolio
WHERE blob_id IN `+whereInFmt+`
);`, idsToInterface(ids)...)
if err != nil {
return nil, fmt.Errorf("querying blob owners: %w", err)
}
defer rows.Close()

result := []*pacta.BlobOwnerInformation{}
seen := map[pacta.BlobID]bool{}

for rows.Next() {
var blobID pacta.BlobID
var ade bool
var ownerID pacta.OwnerID
err := rows.Scan(&blobID, &ade, &ownerID)
if err != nil {
return nil, fmt.Errorf("scanning blob owner: %w", err)
}
if seen[blobID] {
return nil, fmt.Errorf("blob %q has multiple owner entries", blobID)
}
seen[blobID] = true
if ownerID == "" {
return nil, fmt.Errorf("blob %q has empty owner entry", blobID)
}
result = append(result, &pacta.BlobOwnerInformation{
BlobID: blobID,
AdminDebugEnabled: ade,
OwnerID: ownerID,
})
}

for _, blobID := range ids {
if !seen[blobID] {
return nil, db.NotFound(blobID, "blob_id_for_owner")
}
}
return result, nil
}

func (db *DB) putBlob(tx db.Tx, b *pacta.Blob) error {
err := db.exec(tx, `
UPDATE blob SET
Expand Down
3 changes: 3 additions & 0 deletions db/sqldb/golden/human_readable_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ CREATE TABLE analysis_artifact (
id text NOT NULL,
shared_to_public boolean NOT NULL);
ALTER TABLE ONLY analysis_artifact ADD CONSTRAINT analysis_artifact_pkey PRIMARY KEY (id);
CREATE INDEX analysis_artifact_by_blob_id ON analysis_artifact USING btree (blob_id);
ALTER TABLE ONLY analysis_artifact ADD CONSTRAINT analysis_artifact_analysis_id_fkey FOREIGN KEY (analysis_id) REFERENCES analysis(id) ON DELETE RESTRICT;
ALTER TABLE ONLY analysis_artifact ADD CONSTRAINT analysis_artifact_blob_id_fkey FOREIGN KEY (blob_id) REFERENCES blob(id) ON DELETE RESTRICT;

Expand Down Expand Up @@ -119,6 +120,7 @@ CREATE TABLE incomplete_upload (
owner_id text NOT NULL,
ran_at timestamp with time zone);
ALTER TABLE ONLY incomplete_upload ADD CONSTRAINT incomplete_upload_pkey PRIMARY KEY (id);
CREATE INDEX incomplete_upload_by_blob_id ON incomplete_upload USING btree (blob_id);
ALTER TABLE ONLY incomplete_upload ADD CONSTRAINT incomplete_upload_blob_id_fkey FOREIGN KEY (blob_id) REFERENCES blob(id) ON DELETE RESTRICT;
ALTER TABLE ONLY incomplete_upload ADD CONSTRAINT incomplete_upload_owner_id_fkey FOREIGN KEY (owner_id) REFERENCES owner(id) ON DELETE RESTRICT;

Expand Down Expand Up @@ -212,6 +214,7 @@ CREATE TABLE portfolio (
number_of_rows integer,
owner_id text NOT NULL);
ALTER TABLE ONLY portfolio ADD CONSTRAINT portfolio_pkey PRIMARY KEY (id);
CREATE INDEX portfolio_by_blob_id ON portfolio USING btree (blob_id);
ALTER TABLE ONLY portfolio ADD CONSTRAINT portfolio_blob_id_fkey FOREIGN KEY (blob_id) REFERENCES blob(id) ON DELETE RESTRICT;
ALTER TABLE ONLY portfolio ADD CONSTRAINT portfolio_owner_id_fkey FOREIGN KEY (owner_id) REFERENCES owner(id) ON DELETE RESTRICT;

Expand Down
21 changes: 21 additions & 0 deletions db/sqldb/golden/schema_dump.sql
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,20 @@ ALTER TABLE ONLY public.schema_migrations
ADD CONSTRAINT schema_migrations_pkey PRIMARY KEY (version);


--
-- Name: analysis_artifact_by_blob_id; Type: INDEX; Schema: public; Owner: postgres
--

CREATE INDEX analysis_artifact_by_blob_id ON public.analysis_artifact USING btree (blob_id);


--
-- Name: incomplete_upload_by_blob_id; Type: INDEX; Schema: public; Owner: postgres
--

CREATE INDEX incomplete_upload_by_blob_id ON public.incomplete_upload USING btree (blob_id);


--
-- Name: owner_by_initiative_id; Type: INDEX; Schema: public; Owner: postgres
--
Expand All @@ -681,6 +695,13 @@ CREATE INDEX owner_by_initiative_id ON public.owner USING btree (initiative_id);
CREATE INDEX owner_by_user_id ON public.owner USING btree (user_id);


--
-- Name: portfolio_by_blob_id; Type: INDEX; Schema: public; Owner: postgres
--

CREATE INDEX portfolio_by_blob_id ON public.portfolio USING btree (blob_id);


--
-- Name: schema_migrations track_applied_migrations; Type: TRIGGER; Schema: public; Owner: postgres
--
Expand Down
18 changes: 18 additions & 0 deletions db/sqldb/incomplete_upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,31 @@ func TestIncompleteUploadCRUD(t *testing.T) {
t.Fatalf("mismatch (-want +got):\n%s", diff)
}

blobOwners, err := tdb.BlobOwners(tx, []pacta.BlobID{b.ID})
if err != nil {
t.Fatalf("reading blob owners: %v", err)
}
expectedOwners := []*pacta.BlobOwnerInformation{{
BlobID: b.ID,
OwnerID: o2.ID,
AdminDebugEnabled: true,
}}
if diff := cmp.Diff(expectedOwners, blobOwners, cmpOpts); diff != "" {
t.Errorf("unexpected diff (+got -want): %v", diff)
}

buris, err := tdb.DeleteIncompleteUpload(tx, iu.ID)
if err != nil {
t.Fatalf("deleting incompleteUpload: %v", err)
}
if diff := cmp.Diff(b.BlobURI, buris); diff != "" {
t.Fatalf("blob uri mismatch (-want +got):\n%s", diff)
}

_, err = tdb.BlobOwners(tx, []pacta.BlobID{b.ID})
if err == nil {
t.Fatalf("reading blob owners should have failed but was fine", err)
}
}

func TestFailureCodePersistability(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions db/sqldb/migrations/0006_indexes_on_blob_ids.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
BEGIN;

DROP INDEX portfolio_by_blob_id;
DROP INDEX incomplete_upload_by_blob_id;
DROP INDEX analysis_artifact_by_blob_id;

COMMIT;
8 changes: 8 additions & 0 deletions db/sqldb/migrations/0006_indexes_on_blob_ids.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
BEGIN;

-- Creates indexes on blob_id columns for faster lookups when performing ownership lookups.
CREATE INDEX analysis_artifact_by_blob_id ON analysis_artifact (blob_id);
CREATE INDEX incomplete_upload_by_blob_id ON incomplete_upload (blob_id);
CREATE INDEX portfolio_by_blob_id ON portfolio (blob_id);

COMMIT;
18 changes: 18 additions & 0 deletions db/sqldb/portfolio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,31 @@ func TestPortfolioCRUD(t *testing.T) {
t.Fatalf("portfolio mismatch (-want +got):\n%s", diff)
}

blobOwners, err := tdb.BlobOwners(tx, []pacta.BlobID{b.ID})
if err != nil {
t.Fatalf("reading blob owners: %v", err)
}
expectedBlobOwners := []*pacta.BlobOwnerInformation{{
BlobID: b.ID,
OwnerID: o2.ID,
AdminDebugEnabled: true,
}}
if diff := cmp.Diff(expectedBlobOwners, blobOwners, portfolioCmpOpts()); diff != "" {
t.Errorf("unexpected diff (+got -want): %v", diff)
}

buris, err := tdb.DeletePortfolio(tx, p.ID)
if err != nil {
t.Fatalf("deleting portfolio: %v", err)
}
if diff := cmp.Diff([]pacta.BlobURI{b.BlobURI}, buris); diff != "" {
t.Fatalf("blob uri mismatch (-want +got):\n%s", diff)
}

_, err = tdb.BlobOwners(tx, []pacta.BlobID{b.ID})
if err == nil {
t.Fatalf("reading blob owners should have failed but was fine", err)
}
}

// TODO(grady) write a thorough portfolio deletion test
Expand Down
1 change: 1 addition & 0 deletions db/sqldb/sqldb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func TestSchemaHistory(t *testing.T) {
{ID: 3, Version: 3}, // 0003_domain_types
{ID: 4, Version: 4}, // 0004_audit_log_tweaks
{ID: 5, Version: 5}, // 0005_json_blob_type
{ID: 6, Version: 6}, // 0006_indexes_on_blob_ids
}

if diff := cmp.Diff(want, got); diff != "" {
Expand Down
Loading

0 comments on commit 078d65e

Please sign in to comment.