From 078d65e5da33e6c359b308532203eb485212ee6b Mon Sep 17 00:00:00 2001 From: Grady Ward Date: Thu, 28 Dec 2023 07:10:50 -0700 Subject: [PATCH] Creates Blob Retrieval Endpoint --- azure/azblob/azblob.go | 5 +- cmd/server/pactasrv/blobs.go | 67 ++++++++++++++++++ cmd/server/pactasrv/pactasrv.go | 1 + db/sqldb/analysis_artifact_test.go | 22 ++++++ db/sqldb/blob.go | 70 +++++++++++++++++++ db/sqldb/golden/human_readable_schema.sql | 3 + db/sqldb/golden/schema_dump.sql | 21 ++++++ db/sqldb/incomplete_upload_test.go | 18 +++++ .../0006_indexes_on_blob_ids.down.sql | 7 ++ .../0006_indexes_on_blob_ids.up.sql | 8 +++ db/sqldb/portfolio_test.go | 18 +++++ db/sqldb/sqldb_test.go | 1 + openapi/pacta.yaml | 64 +++++++++++++++++ pacta/pacta.go | 17 +++++ 14 files changed, 320 insertions(+), 2 deletions(-) create mode 100644 cmd/server/pactasrv/blobs.go create mode 100644 db/sqldb/migrations/0006_indexes_on_blob_ids.down.sql create mode 100644 db/sqldb/migrations/0006_indexes_on_blob_ids.up.sql diff --git a/azure/azblob/azblob.go b/azure/azblob/azblob.go index 819aa40..b93c334 100644 --- a/azure/azblob/azblob.go +++ b/azure/azblob/azblob.go @@ -132,8 +132,9 @@ func (c *Client) signBlob(ctx context.Context, uri string, perms *sas.BlobPermis // Create Blob Signature Values with desired permissions and sign with user delegation credential sasQueryParams, err := sas.BlobSignatureValues{ - Protocol: sas.ProtocolHTTPS, - StartTime: now, + Protocol: sas.ProtocolHTTPS, + StartTime: now, + // TODO(grady) extract this to a common variable in a sensible way. ExpiryTime: now.Add(15 * time.Minute), Permissions: perms.String(), ContainerName: ctr, diff --git a/cmd/server/pactasrv/blobs.go b/cmd/server/pactasrv/blobs.go new file mode 100644 index 0000000..f5d4fd8 --- /dev/null +++ b/cmd/server/pactasrv/blobs.go @@ -0,0 +1,67 @@ +package pactasrv + +import ( + "context" + "time" + + "github.com/RMI/pacta/db" + "github.com/RMI/pacta/oapierr" + api "github.com/RMI/pacta/openapi/pacta" + "github.com/RMI/pacta/pacta" + "go.uber.org/zap" +) + +func (s *Server) AccessBlobContent(ctx context.Context, request api.AccessBlobContentReq) (api.AccessBlobContentResp, error) { + ownerID, err := s.getUserOwnerID(ctx) + if err != nil { + return nil, err + } + + blobIDs := []pacta.BlobID{} + for _, item := range request.Body.Items { + blobIDs = append(blobIDs, pacta.BlobID(item.BlobID)) + } + err404 := oapierr.NotFound("blob not found", zap.Strings("blob_ids", asStrs(blobIDs))) + bois, err := s.DB.BlobOwners(s.DB.NoTxn(ctx), blobIDs) + if err != nil { + if db.IsNotFound(err) { + return nil, err404 + } + return nil, oapierr.Internal("error getting blob owners", zap.Error(err), zap.Strings("blob_ids", asStrs(request.BlobIDs))) + } + asMap := map[pacta.BlobID]*pacta.BlobOwnerInformation{} + for _, boi := range bois { + asMap[boi.BlobID] = boi + } + for _, blobID := range blobIDs { + boi := asMap[blobID] + if boi.OwnerID != ownerID { + // TODO(grady) do AdminDebugEnabled & IsAdmin check here. + return nil, err404 + } + } + + blobs, err := s.DB.Blobs(s.DB.NoTxn(ctx), blobIDs) + if err != nil { + if db.IsNotFound(err) { + return nil, err404 + } + return nil, oapierr.Internal("error getting blobs", zap.Error(err), zap.Strings("blob_ids", asStrs(request.BlobIDs))) + } + + // TODO(grady) - Add Audit Logs here + + response := api.AccessBlobContentResp{} + for _, blob := range blobs { + url, err := s.Blob.SignedDownloadURL(ctx, string(blob.BlobURI)) + if err != nil { + return nil, oapierr.Internal("error getting signed download url", zap.Error(err), zap.String("blob_uri", string(blob.BlobURI))) + } + response.Items = append(response.Items, api.AccessBlobContentRespItems{ + BlobID: blob.ID, + URL: url, + ExpirationTime: s.Now().Add(15 * time.Minute), // See TODO in azblob.go + }) + } + return response, nil +} diff --git a/cmd/server/pactasrv/pactasrv.go b/cmd/server/pactasrv/pactasrv.go index 37e2a2a..c47c5cf 100644 --- a/cmd/server/pactasrv/pactasrv.go +++ b/cmd/server/pactasrv/pactasrv.go @@ -35,6 +35,7 @@ type DB interface { CreateBlob(tx db.Tx, b *pacta.Blob) (pacta.BlobID, error) UpdateBlob(tx db.Tx, id pacta.BlobID, mutations ...db.UpdateBlobFn) error DeleteBlob(tx db.Tx, id pacta.BlobID) (pacta.BlobURI, error) + BlobOwners(tx db.Tx, ids []pacta.BlobID) ([]*pacta.BlobOwnerInformation, error) InitiativeInvitation(tx db.Tx, id pacta.InitiativeInvitationID) (*pacta.InitiativeInvitation, error) InitiativeInvitationsByInitiative(tx db.Tx, iid pacta.InitiativeID) ([]*pacta.InitiativeInvitation, error) diff --git a/db/sqldb/analysis_artifact_test.go b/db/sqldb/analysis_artifact_test.go index b33f5f3..8399f01 100644 --- a/db/sqldb/analysis_artifact_test.go +++ b/db/sqldb/analysis_artifact_test.go @@ -99,6 +99,19 @@ func TestAnalysisArtifacts(t *testing.T) { t.Errorf("unexpected diff (+got -want): %v", diff) } + blobOwners, err := tdb.BlobOwners(tx, []pacta.BlobID{b1.ID, b2.ID, b3.ID}) + if err != nil { + t.Fatalf("reading blob owners: %v", err) + } + expectedOwners := []*pacta.BlobOwnerInformation{ + {BlobID: b1.ID, OwnerID: o.ID, AdminDebugEnabled: false}, + {BlobID: b2.ID, OwnerID: o.ID, AdminDebugEnabled: true}, + {BlobID: b3.ID, OwnerID: o.ID, AdminDebugEnabled: false}, + } + if diff := cmp.Diff(expectedOwners, blobOwners, cmpOpts); diff != "" { + t.Errorf("unexpected diff (+got -want): %v", diff) + } + buris, err := tdb.DeleteAnalysis(tx, aid) if err != nil { t.Fatalf("deleting analysis: %v", err) @@ -106,6 +119,11 @@ func TestAnalysisArtifacts(t *testing.T) { if diff := cmp.Diff([]pacta.BlobURI{b1.BlobURI, b2.BlobURI, b3.BlobURI}, buris, cmpOpts); diff != "" { t.Errorf("unexpected diff (+got -want): %v", diff) } + + _, err = tdb.BlobOwners(tx, []pacta.BlobID{b1.ID, b2.ID, b3.ID}) + if err == nil { + t.Fatalf("reading blob owners should have failed but was fine", err) + } } func analysisArtifactCmpOpts() cmp.Option { @@ -115,10 +133,14 @@ func analysisArtifactCmpOpts() cmp.Option { aaLessFn := func(a, b *pacta.AnalysisArtifact) bool { return a.ID < b.ID } + boLessFn := func(a, b *pacta.BlobOwnerInformation) bool { + return a.BlobID < b.BlobID + } return cmp.Options{ cmpopts.EquateEmpty(), cmpopts.EquateApproxTime(time.Second), cmpopts.SortSlices(blobURILessFn), cmpopts.SortSlices(aaLessFn), + cmpopts.SortSlices(boLessFn), } } diff --git a/db/sqldb/blob.go b/db/sqldb/blob.go index 4c53b4a..97a69f2 100644 --- a/db/sqldb/blob.go +++ b/db/sqldb/blob.go @@ -102,6 +102,76 @@ func (d *DB) DeleteBlob(tx db.Tx, id pacta.BlobID) (pacta.BlobURI, error) { return buri, nil } +func (d *DB) BlobOwners(tx db.Tx, ids []pacta.BlobID) ([]*pacta.BlobOwnerInformation, error) { + ids = dedupeIDs(ids) + if len(ids) == 0 { + return []*pacta.BlobOwnerInformation{}, nil + } + whereInFmt := createWhereInFmt(len(ids)) + rows, err := d.query(tx, ` +( + SELECT + analysis_artifact.blob_id as blob_id, + analysis_artifact.admin_debug_enabled, + analysis.owner_id as owner_id + FROM + analysis_artifact + LEFT JOIN analysis ON analysis_artifact.analysis_id = analysis.id + WHERE + analysis_artifact.blob_id IN `+whereInFmt+` +) UNION ALL ( + SELECT + blob_id, + admin_debug_enabled, + owner_id + FROM incomplete_upload + WHERE blob_id IN `+whereInFmt+` +) UNION ALL ( + SELECT + blob_id, + admin_debug_enabled, + owner_id + FROM portfolio + WHERE blob_id IN `+whereInFmt+` +);`, idsToInterface(ids)...) + if err != nil { + return nil, fmt.Errorf("querying blob owners: %w", err) + } + defer rows.Close() + + result := []*pacta.BlobOwnerInformation{} + seen := map[pacta.BlobID]bool{} + + for rows.Next() { + var blobID pacta.BlobID + var ade bool + var ownerID pacta.OwnerID + err := rows.Scan(&blobID, &ade, &ownerID) + if err != nil { + return nil, fmt.Errorf("scanning blob owner: %w", err) + } + if seen[blobID] { + return nil, fmt.Errorf("blob %q has multiple owner entries", blobID) + } + seen[blobID] = true + if ownerID == "" { + return nil, fmt.Errorf("blob %q has empty owner entry", blobID) + } + result = append(result, &pacta.BlobOwnerInformation{ + BlobID: blobID, + AdminDebugEnabled: ade, + OwnerID: ownerID, + }) + } + + for _, blobID := range ids { + if !seen[blobID] { + return nil, db.NotFound(blobID, "blob_id_for_owner") + } + } + return result, nil +} + func (db *DB) putBlob(tx db.Tx, b *pacta.Blob) error { err := db.exec(tx, ` UPDATE blob SET diff --git a/db/sqldb/golden/human_readable_schema.sql b/db/sqldb/golden/human_readable_schema.sql index 9eff8d7..f91d938 100644 --- a/db/sqldb/golden/human_readable_schema.sql +++ b/db/sqldb/golden/human_readable_schema.sql @@ -75,6 +75,7 @@ CREATE TABLE analysis_artifact ( id text NOT NULL, shared_to_public boolean NOT NULL); ALTER TABLE ONLY analysis_artifact ADD CONSTRAINT analysis_artifact_pkey PRIMARY KEY (id); +CREATE INDEX analysis_artifact_by_blob_id ON analysis_artifact USING btree (blob_id); ALTER TABLE ONLY analysis_artifact ADD CONSTRAINT analysis_artifact_analysis_id_fkey FOREIGN KEY (analysis_id) REFERENCES analysis(id) ON DELETE RESTRICT; ALTER TABLE ONLY analysis_artifact ADD CONSTRAINT analysis_artifact_blob_id_fkey FOREIGN KEY (blob_id) REFERENCES blob(id) ON DELETE RESTRICT; @@ -119,6 +120,7 @@ CREATE TABLE incomplete_upload ( owner_id text NOT NULL, ran_at timestamp with time zone); ALTER TABLE ONLY incomplete_upload ADD CONSTRAINT incomplete_upload_pkey PRIMARY KEY (id); +CREATE INDEX incomplete_upload_by_blob_id ON incomplete_upload USING btree (blob_id); ALTER TABLE ONLY incomplete_upload ADD CONSTRAINT incomplete_upload_blob_id_fkey FOREIGN KEY (blob_id) REFERENCES blob(id) ON DELETE RESTRICT; ALTER TABLE ONLY incomplete_upload ADD CONSTRAINT incomplete_upload_owner_id_fkey FOREIGN KEY (owner_id) REFERENCES owner(id) ON DELETE RESTRICT; @@ -212,6 +214,7 @@ CREATE TABLE portfolio ( number_of_rows integer, owner_id text NOT NULL); ALTER TABLE ONLY portfolio ADD CONSTRAINT portfolio_pkey PRIMARY KEY (id); +CREATE INDEX portfolio_by_blob_id ON portfolio USING btree (blob_id); ALTER TABLE ONLY portfolio ADD CONSTRAINT portfolio_blob_id_fkey FOREIGN KEY (blob_id) REFERENCES blob(id) ON DELETE RESTRICT; ALTER TABLE ONLY portfolio ADD CONSTRAINT portfolio_owner_id_fkey FOREIGN KEY (owner_id) REFERENCES owner(id) ON DELETE RESTRICT; diff --git a/db/sqldb/golden/schema_dump.sql b/db/sqldb/golden/schema_dump.sql index 2364cdb..9ecc1b9 100644 --- a/db/sqldb/golden/schema_dump.sql +++ b/db/sqldb/golden/schema_dump.sql @@ -667,6 +667,20 @@ ALTER TABLE ONLY public.schema_migrations ADD CONSTRAINT schema_migrations_pkey PRIMARY KEY (version); +-- +-- Name: analysis_artifact_by_blob_id; Type: INDEX; Schema: public; Owner: postgres +-- + +CREATE INDEX analysis_artifact_by_blob_id ON public.analysis_artifact USING btree (blob_id); + + +-- +-- Name: incomplete_upload_by_blob_id; Type: INDEX; Schema: public; Owner: postgres +-- + +CREATE INDEX incomplete_upload_by_blob_id ON public.incomplete_upload USING btree (blob_id); + + -- -- Name: owner_by_initiative_id; Type: INDEX; Schema: public; Owner: postgres -- @@ -681,6 +695,13 @@ CREATE INDEX owner_by_initiative_id ON public.owner USING btree (initiative_id); CREATE INDEX owner_by_user_id ON public.owner USING btree (user_id); +-- +-- Name: portfolio_by_blob_id; Type: INDEX; Schema: public; Owner: postgres +-- + +CREATE INDEX portfolio_by_blob_id ON public.portfolio USING btree (blob_id); + + -- -- Name: schema_migrations track_applied_migrations; Type: TRIGGER; Schema: public; Owner: postgres -- diff --git a/db/sqldb/incomplete_upload_test.go b/db/sqldb/incomplete_upload_test.go index 7eb6057..7b8e774 100644 --- a/db/sqldb/incomplete_upload_test.go +++ b/db/sqldb/incomplete_upload_test.go @@ -106,6 +106,19 @@ func TestIncompleteUploadCRUD(t *testing.T) { t.Fatalf("mismatch (-want +got):\n%s", diff) } + blobOwners, err := tdb.BlobOwners(tx, []pacta.BlobID{b.ID}) + if err != nil { + t.Fatalf("reading blob owners: %v", err) + } + expectedOwners := []*pacta.BlobOwnerInformation{{ + BlobID: b.ID, + OwnerID: o2.ID, + AdminDebugEnabled: true, + }} + if diff := cmp.Diff(expectedOwners, blobOwners, cmpOpts); diff != "" { + t.Errorf("unexpected diff (+got -want): %v", diff) + } + buris, err := tdb.DeleteIncompleteUpload(tx, iu.ID) if err != nil { t.Fatalf("deleting incompleteUpload: %v", err) @@ -113,6 +126,11 @@ func TestIncompleteUploadCRUD(t *testing.T) { if diff := cmp.Diff(b.BlobURI, buris); diff != "" { t.Fatalf("blob uri mismatch (-want +got):\n%s", diff) } + + _, err = tdb.BlobOwners(tx, []pacta.BlobID{b.ID}) + if err == nil { + t.Fatalf("reading blob owners should have failed but was fine", err) + } } func TestFailureCodePersistability(t *testing.T) { diff --git a/db/sqldb/migrations/0006_indexes_on_blob_ids.down.sql b/db/sqldb/migrations/0006_indexes_on_blob_ids.down.sql new file mode 100644 index 0000000..6a27b31 --- /dev/null +++ b/db/sqldb/migrations/0006_indexes_on_blob_ids.down.sql @@ -0,0 +1,7 @@ +BEGIN; + +DROP INDEX portfolio_by_blob_id; +DROP INDEX incomplete_upload_by_blob_id; +DROP INDEX analysis_artifact_by_blob_id; + +COMMIT; \ No newline at end of file diff --git a/db/sqldb/migrations/0006_indexes_on_blob_ids.up.sql b/db/sqldb/migrations/0006_indexes_on_blob_ids.up.sql new file mode 100644 index 0000000..8770ada --- /dev/null +++ b/db/sqldb/migrations/0006_indexes_on_blob_ids.up.sql @@ -0,0 +1,8 @@ +BEGIN; + +-- Creates indexes on blob_id columns for faster lookups when performing ownership lookups. +CREATE INDEX analysis_artifact_by_blob_id ON analysis_artifact (blob_id); +CREATE INDEX incomplete_upload_by_blob_id ON incomplete_upload (blob_id); +CREATE INDEX portfolio_by_blob_id ON portfolio (blob_id); + +COMMIT; \ No newline at end of file diff --git a/db/sqldb/portfolio_test.go b/db/sqldb/portfolio_test.go index f74998b..1086b7f 100644 --- a/db/sqldb/portfolio_test.go +++ b/db/sqldb/portfolio_test.go @@ -95,6 +95,19 @@ func TestPortfolioCRUD(t *testing.T) { t.Fatalf("portfolio mismatch (-want +got):\n%s", diff) } + blobOwners, err := tdb.BlobOwners(tx, []pacta.BlobID{b.ID}) + if err != nil { + t.Fatalf("reading blob owners: %v", err) + } + expectedBlobOwners := []*pacta.BlobOwnerInformation{{ + BlobID: b.ID, + OwnerID: o2.ID, + AdminDebugEnabled: true, + }} + if diff := cmp.Diff(expectedBlobOwners, blobOwners, portfolioCmpOpts()); diff != "" { + t.Errorf("unexpected diff (+got -want): %v", diff) + } + buris, err := tdb.DeletePortfolio(tx, p.ID) if err != nil { t.Fatalf("deleting portfolio: %v", err) @@ -102,6 +115,11 @@ func TestPortfolioCRUD(t *testing.T) { if diff := cmp.Diff([]pacta.BlobURI{b.BlobURI}, buris); diff != "" { t.Fatalf("blob uri mismatch (-want +got):\n%s", diff) } + + _, err = tdb.BlobOwners(tx, []pacta.BlobID{b.ID}) + if err == nil { + t.Fatalf("reading blob owners should have failed but was fine", err) + } } // TODO(grady) write a thorough portfolio deletion test diff --git a/db/sqldb/sqldb_test.go b/db/sqldb/sqldb_test.go index cc627b5..cafdb77 100644 --- a/db/sqldb/sqldb_test.go +++ b/db/sqldb/sqldb_test.go @@ -87,6 +87,7 @@ func TestSchemaHistory(t *testing.T) { {ID: 3, Version: 3}, // 0003_domain_types {ID: 4, Version: 4}, // 0004_audit_log_tweaks {ID: 5, Version: 5}, // 0005_json_blob_type + {ID: 6, Version: 6}, // 0006_indexes_on_blob_ids } if diff := cmp.Diff(want, got); diff != "" { diff --git a/openapi/pacta.yaml b/openapi/pacta.yaml index 00b8d74..32c303b 100644 --- a/openapi/pacta.yaml +++ b/openapi/pacta.yaml @@ -29,6 +29,26 @@ definitions: - de basePath: /v1 paths: + /access-blob-content: + post: + summary: Gives the caller access to the blob + description: Checks whether the user can access the blobs, and if so, returns blob download URLs for each, generating an audit log along the way + operationId: accessBlobContent + requestBody: + description: Information about the blobs that are requested + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/AccessBlobContentReq' + responses: + '200': + description: the user can access the blobs, and the access URLs are returned, along with information about their expiration + content: + application/json: + schema: + $ref: '#/components/schemas/AccessBlobContentResp' + /pacta-version/{id}: get: summary: Returns a version of the PACTA model by ID @@ -1306,6 +1326,50 @@ components: description: The unique identifier for the uploaded asset CompletePortfolioUploadResp: type: object + AccessBlobContentReq: + type: object + required: + - items + properties: + items: + type: array + items: + $ref: '#/components/schemas/AccessBlobContentReqItem' + AccessBlobContentReqItem: + type: object + required: + - blob_id + properties: + blob_id: + type: string + description: The id of the blob to request the content for. + AccessBlobContentResp: + type: object + required: + - items + properties: + items: + type: array + description: The list of blob access items, one for each requested blob + items: + $ref: '#/components/schemas/AccessBlobContentRespItem' + AccessBlobContentRespItem: + type: object + required: + - blob_id + - download_url + - expiration_time + properties: + blob_id: + type: string + description: The id of the blob to that the content is for. + download_url: + type: string + description: The signed URL where the file can be downloaded from, using GET semantics. + expiration_time: + format: date-time + type: string + description: The time at which the signed URL will expire. HoldingsDate: type: object required: diff --git a/pacta/pacta.go b/pacta/pacta.go index 8861cb6..e623ad3 100644 --- a/pacta/pacta.go +++ b/pacta/pacta.go @@ -260,6 +260,23 @@ func (o *Blob) Clone() *Blob { } } +type BlobOwnerInformation struct { + BlobID BlobID + OwnerID OwnerID + AdminDebugEnabled bool +} + +func (o *BlobOwnerInformation) Clone() *BlobOwnerInformation { + if o == nil { + return nil + } + return &BlobOwnerInformation{ + BlobID: o.BlobID, + OwnerID: o.OwnerID, + AdminDebugEnabled: o.AdminDebugEnabled, + } +} + type OwnerID string type Owner struct { ID OwnerID