Skip to content

Commit

Permalink
No more Editing of Audit Logs, Addresses Other Review Comments As Well
Browse files Browse the repository at this point in the history
  • Loading branch information
gbdubs committed Jan 9, 2024
1 parent ee77e75 commit 5cb3f58
Show file tree
Hide file tree
Showing 15 changed files with 535 additions and 216 deletions.
69 changes: 40 additions & 29 deletions cmd/server/pactasrv/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,28 @@ import (
// (POST /admin/merge-users)
func (s *Server) MergeUsers(ctx context.Context, request api.MergeUsersRequestObject) (api.MergeUsersResponseObject, error) {
req := request.Body
actorUserInfo, err := s.getActorInfoOrFail(ctx)
actorUserInfo, err := s.getactorInfoOrErrIfAnon(ctx)
if err != nil {
return nil, err
}
fieldsIfErr := []zap.Field{
zap.String("actor_user_id", string(actorUserInfo.UserID)),
zap.String("from_user_id", req.FromUserId),
zap.String("to_user_id", req.ToUserId),
}
if !actorUserInfo.IsAdmin && !actorUserInfo.IsSuperAdmin {
return nil, oapierr.Forbidden("only admins can merge users",
zap.String("actor_user_id", string(actorUserInfo.UserID)),
zap.String("from_user_id", req.FromUserId),
zap.String("to_user_id", req.ToUserId))
return nil, oapierr.Forbidden("only admins can merge users", fieldsIfErr...)
}

sourceUID := pacta.UserID(req.FromUserId)
destUID := pacta.UserID(req.ToUserId)
if sourceUID == destUID {
return nil, oapierr.BadRequest("cannot merge user into themselves", fieldsIfErr...)
}

var (
numIncompleteUploads, numAnalyses, numPortfolios, numPortfolioGroups, numAuditLogs, numAuditLogsCreated int
buris []pacta.BlobURI
numIncompleteUploads, numAnalyses, numPortfolios, numPortfolioGroups, numAuditLogsCreated int
buris []pacta.BlobURI
)

err = s.DB.Transactional(ctx, func(tx db.Tx) error {
Expand All @@ -44,16 +49,16 @@ func (s *Server) MergeUsers(ctx context.Context, request api.MergeUsersRequestOb
return fmt.Errorf("failed to get owner for destination user: %w", err)
}

// Note we do an audit log transfer FIRST so that we don't transfer the audit logs generated from the transfer itself.
nal, err := s.DB.TransferAuditLogOwnership(tx, sourceUID, destUID, sourceOwner, destOwner)
if err != nil {
return fmt.Errorf("failed to transfer audit log ownership: %w", err)
if err = s.DB.RecordUserMerge(tx, sourceUID, destUID, actorUserInfo.UserID); err != nil {
return fmt.Errorf("failed to record user merge: %w", err)
}
if err = s.DB.RecordOwnerMerge(tx, sourceOwner, destOwner, actorUserInfo.UserID); err != nil {
return fmt.Errorf("failed to record owner merge: %w", err)
}
numAuditLogs = nal

auditLogsToCreate := []pacta.AuditLog{}
auditLogsToCreate := []*pacta.AuditLog{}
addAuditLog := func(t pacta.AuditLogTargetType, id string) {
auditLogsToCreate = append(auditLogsToCreate, pacta.AuditLog{
auditLogsToCreate = append(auditLogsToCreate, &pacta.AuditLog{
Action: pacta.AuditLogAction_TransferOwnership,
ActorType: pacta.AuditLogActorType_Admin,
ActorID: string(actorUserInfo.UserID),
Expand All @@ -74,7 +79,7 @@ func (s *Server) MergeUsers(ctx context.Context, request api.MergeUsersRequestOb
for i, upload := range incompleteUploads {
err := s.DB.UpdateIncompleteUpload(tx, upload.ID, db.SetIncompleteUploadOwner(destOwner))
if err != nil {
return fmt.Errorf("failed to update upload owner %d/%d: %w", i, len(incompleteUploads), err)
return fmt.Errorf("failed to update upload owner %d/%d: %w", i+1, len(incompleteUploads), err)
}
addAuditLog(pacta.AuditLogTargetType_IncompleteUpload, string(upload.ID))
}
Expand All @@ -87,7 +92,7 @@ func (s *Server) MergeUsers(ctx context.Context, request api.MergeUsersRequestOb
for i, analysis := range analyses {
err := s.DB.UpdateAnalysis(tx, analysis.ID, db.SetAnalysisOwner(destOwner))
if err != nil {
return fmt.Errorf("failed to update analysis owner %d/%d: %w", i, len(analyses), err)
return fmt.Errorf("failed to update analysis owner %d/%d: %w", i+1, len(analyses), err)
}
addAuditLog(pacta.AuditLogTargetType_Analysis, string(analysis.ID))
}
Expand All @@ -100,7 +105,7 @@ func (s *Server) MergeUsers(ctx context.Context, request api.MergeUsersRequestOb
for i, portfolio := range portfolios {
err := s.DB.UpdatePortfolio(tx, portfolio.ID, db.SetPortfolioOwner(destOwner))
if err != nil {
return fmt.Errorf("failed to update portfolio owner %d/%d: %w", i, len(portfolios), err)
return fmt.Errorf("failed to update portfolio owner %d/%d: %w", i+1, len(portfolios), err)
}
addAuditLog(pacta.AuditLogTargetType_Portfolio, string(portfolio.ID))
}
Expand All @@ -113,31 +118,32 @@ func (s *Server) MergeUsers(ctx context.Context, request api.MergeUsersRequestOb
for i, portfolioGroup := range portfolioGroups {
err := s.DB.UpdatePortfolioGroup(tx, portfolioGroup.ID, db.SetPortfolioGroupOwner(destOwner))
if err != nil {
return fmt.Errorf("failed to update portfolio group owner %d/%d: %w", i, len(portfolioGroups), err)
return fmt.Errorf("failed to update portfolio group owner %d/%d: %w", i+1, len(portfolioGroups), err)
}
addAuditLog(pacta.AuditLogTargetType_PortfolioGroup, string(portfolioGroup.ID))
}
numPortfolioGroups = len(portfolioGroups)

for _, auditLog := range auditLogsToCreate {
_, err := s.DB.CreateAuditLog(tx, &auditLog)
if err != nil {
return fmt.Errorf("failed to create audit log: %w", err)
}
if err := s.DB.CreateAuditLogs(tx, auditLogsToCreate); err != nil {
return fmt.Errorf("failed to create audit logs: %w", err)
}
numAuditLogsCreated = len(auditLogsToCreate)

// Now that we've transferred all the audit logs, we can delete the user.
newBuris, err := s.DB.DeleteUser(tx, sourceUID)
// Now that we've transferred all the entities, we can delete the user.
deletedUserBuris, err := s.DB.DeleteUser(tx, sourceUID)
if err != nil {
return fmt.Errorf("failed to delete user: %w", err)
}
buris = append(buris, newBuris...)
if len(deletedUserBuris) > 0 {
// Note in this case we won't commit the transaction, so this data won't be orphaned.
return fmt.Errorf("failed to delete user: user still has blobs: %v", deletedUserBuris)
}

return nil
})
if err != nil {
return nil, oapierr.Internal("failed to merge users", zap.Error(err), zap.String("actor_user_id", string(actorUserInfo.UserID)), zap.String("from_user_id", req.FromUserId), zap.String("to_user_id", req.ToUserId))
fieldsIfErr := append(fieldsIfErr, zap.Error(err))
return nil, oapierr.Internal("failed to merge users", fieldsIfErr...)
}

if err := s.deleteBlobs(ctx, buris...); err != nil {
Expand All @@ -148,12 +154,17 @@ func (s *Server) MergeUsers(ctx context.Context, request api.MergeUsersRequestOb
zap.String("actor_user_id", string(actorUserInfo.UserID)),
zap.String("from_user_id", req.FromUserId),
zap.String("to_user_id", req.ToUserId),
zap.Int("num_audit_logs_transferred", numAuditLogs),
zap.Int("num_incomplete_uploads", numIncompleteUploads),
zap.Int("num_analyses", numAnalyses),
zap.Int("num_portfolios", numPortfolios),
zap.Int("num_portfolio_groups", numPortfolioGroups),
zap.Int("num_audit_logs_created", numAuditLogsCreated),
)
return api.MergeUsers204Response{}, nil
return api.MergeUsers200JSONResponse{
AuditLogsCreated: numAuditLogsCreated,
IncompleteUploadCount: numIncompleteUploads,
PortfolioCount: numPortfolios,
PortfolioGroupCount: numPortfolioGroups,
AnalysisCount: numAnalyses,
}, nil
}
14 changes: 2 additions & 12 deletions cmd/server/pactasrv/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pactasrv

import (
"context"
"fmt"

"github.com/RMI/pacta/db"
"github.com/RMI/pacta/oapierr"
Expand All @@ -12,7 +11,7 @@ import (
)

func (s *Server) AccessBlobContent(ctx context.Context, request api.AccessBlobContentRequestObject) (api.AccessBlobContentResponseObject, error) {
actorInfo, err := s.getActorInfoOrFail(ctx)
actorInfo, err := s.getactorInfoOrErrIfAnon(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -69,16 +68,7 @@ func (s *Server) AccessBlobContent(ctx context.Context, request api.AccessBlobCo
return nil, oapierr.Internal("error getting blobs", zap.Error(err), zap.Strings("blob_ids", asStrs(blobIDs)))
}

err = s.DB.Transactional(ctx, func(tx db.Tx) error {
for i, al := range auditLogs {
_, err := s.DB.CreateAuditLog(tx, al)
if err != nil {
return fmt.Errorf("creating audit log %d/%d: %w", i+1, len(auditLogs), err)
}
}
return nil
})
if err != nil {
if err = s.DB.CreateAuditLogs(s.DB.NoTxn(ctx), auditLogs); err != nil {
return nil, oapierr.Internal("error creating audit logs - no download URLs generated", zap.Error(err), zap.Strings("blob_ids", asStrs(blobIDs)))
}

Expand Down
7 changes: 5 additions & 2 deletions cmd/server/pactasrv/pactasrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,11 @@ type DB interface {
DeleteUser(tx db.Tx, id pacta.UserID) ([]pacta.BlobURI, error)

CreateAuditLog(tx db.Tx, a *pacta.AuditLog) (pacta.AuditLogID, error)
CreateAuditLogs(tx db.Tx, as []*pacta.AuditLog) error
AuditLogs(tx db.Tx, q *db.AuditLogQuery) ([]*pacta.AuditLog, *db.PageInfo, error)
TransferAuditLogOwnership(tx db.Tx, sourceUserID, destUserID pacta.UserID, sourceOwnerID, destOwnerID pacta.OwnerID) (int, error)

RecordUserMerge(tx db.Tx, fromUserID, toUserID, actorUserID pacta.UserID) error
RecordOwnerMerge(tx db.Tx, fromUserID, toUserID pacta.OwnerID, actorUserID pacta.UserID) error
}

type Blob interface {
Expand Down Expand Up @@ -212,7 +215,7 @@ type actorInfo struct {
IsSuperAdmin bool
}

func (s *Server) getActorInfoOrFail(ctx context.Context) (*actorInfo, error) {
func (s *Server) getactorInfoOrErrIfAnon(ctx context.Context) (*actorInfo, error) {
actorUserID, err := getUserID(ctx)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions db/sqldb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"initiative.go",
"initiative_invitation.go",
"initiative_user.go",
"merge.go",
"owner.go",
"pacta_version.go",
"portfolio.go",
Expand Down
116 changes: 45 additions & 71 deletions db/sqldb/audit_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (d *DB) AuditLogs(tx db.Tx, q *db.AuditLogQuery) ([]*pacta.AuditLog, *db.Pa
if err != nil {
return nil, nil, fmt.Errorf("converting cursor to offset: %w", err)
}
q, err = d.expandAuditLogQueryToAccountForMerges(tx, q)
if err != nil {
return nil, nil, fmt.Errorf("expanding audit_log query to account for merges: %w", err)
}
sql, args, err := auditLogQuery(q)
if err != nil {
return nil, nil, fmt.Errorf("building audit_log query: %w", err)
Expand All @@ -55,8 +59,42 @@ func (d *DB) AuditLogs(tx db.Tx, q *db.AuditLogQuery) ([]*pacta.AuditLog, *db.Pa
}

func (d *DB) CreateAuditLog(tx db.Tx, a *pacta.AuditLog) (pacta.AuditLogID, error) {
sql, args, id, err := d.buildCreateAuditLogQuery(tx, a)
if err != nil {
return "", err
}
err = d.exec(tx, sql, args...)
if err != nil {
return "", fmt.Errorf("creating audit_log row: %w", err)
}
return id, nil
}

func (d *DB) CreateAuditLogs(tx db.Tx, als []*pacta.AuditLog) error {
if len(als) == 0 {
return nil
}
if len(als) == 1 {
_, err := d.CreateAuditLog(tx, als[0])
return err
}
batch := &pgx.Batch{}
for _, al := range als {
sql, args, _, err := d.buildCreateAuditLogQuery(tx, al)
if err != nil {
return fmt.Errorf("building batch audit_log updates: %w", err)
}
batch.Queue(sql, args...)
}
if err := d.ExecBatch(tx, batch); err != nil {
return fmt.Errorf("batch creating audit_logs: %w", err)
}
return nil
}

func (d *DB) buildCreateAuditLogQuery(tx db.Tx, a *pacta.AuditLog) (string, []interface{}, pacta.AuditLogID, error) {
if err := validateAuditLogForCreation(a); err != nil {
return "", fmt.Errorf("validating audit_log for creation: %w", err)
return "", nil, "", fmt.Errorf("validating audit_log for creation: %w", err)
}
id := pacta.AuditLogID(d.randomID(auditLogIDNamespace))
ownerFn := func(o *pacta.Owner) pgtype.Text {
Expand All @@ -70,7 +108,7 @@ func (d *DB) CreateAuditLog(tx db.Tx, a *pacta.AuditLog) (pacta.AuditLogID, erro
stt.Valid = true
stt.String = string(a.SecondaryTargetType)
}
err := d.exec(tx, `
sql := `
INSERT INTO audit_log
(
id, action, actor_type, actor_id, actor_owner_id,
Expand All @@ -79,77 +117,13 @@ func (d *DB) CreateAuditLog(tx db.Tx, a *pacta.AuditLog) (pacta.AuditLogID, erro
)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);
`, id, a.Action, a.ActorType, a.ActorID, ownerFn(a.ActorOwner),
`
args := []interface{}{
id, a.Action, a.ActorType, a.ActorID, ownerFn(a.ActorOwner),
a.PrimaryTargetType, a.PrimaryTargetID, ownerFn(a.PrimaryTargetOwner),
stt, a.SecondaryTargetID, ownerFn(a.SecondaryTargetOwner))
if err != nil {
return "", fmt.Errorf("creating audit_log row: %w", err)
}
return id, nil
}

func (d *DB) TransferAuditLogOwnership(tx db.Tx, sourceUserID, destUserID pacta.UserID, sourceOwnerID, destOwnerID pacta.OwnerID) (int, error) {
auditLogsUpdated := -1
err := d.RunOrContinueTransaction(tx, func(tx db.Tx) error {
countLogsReferencingSource := func() (int, error) {
row := d.queryRow(tx, `
SELECT
COUNT(*)
FROM audit_log
WHERE
actor_id = $1 OR
primary_target_id = $1 OR
secondary_target_id = $1 OR
actor_owner_id = $2 OR
primary_target_owner_id = $2 OR
secondary_target_owner_id = $2;`, sourceUserID, sourceOwnerID)
n := -1
if err := row.Scan(&n); err != nil {
return -1, fmt.Errorf("scanning audit_log count: %w", err)
}
return n, nil
}
alu, err := countLogsReferencingSource()
if err != nil {
return fmt.Errorf("counting audit_logs referencing source user: %w", err)
}
auditLogsUpdated = alu

userIDStatements := []string{
`UPDATE audit_log SET actor_id = $2 WHERE actor_id = $1;`,
`UPDATE audit_log SET primary_target_id = $2 WHERE primary_target_id = $1;`,
`UPDATE audit_log SET secondary_target_id = $2 WHERE secondary_target_id = $1;`,
}
for i, userIDStatement := range userIDStatements {
if err := d.exec(tx, userIDStatement, sourceUserID, destUserID); err != nil {
return fmt.Errorf("user id statement %d/%d: %w", i, len(userIDStatements), err)
}
}
ownerIDStatements := []string{
`UPDATE audit_log SET actor_owner_id = $2 WHERE actor_owner_id = $1;`,
`UPDATE audit_log SET primary_target_owner_id = $2 WHERE primary_target_owner_id = $1;`,
`UPDATE audit_log SET secondary_target_owner_id = $2 WHERE secondary_target_owner_id = $1;`,
}
for i, ownerIDStatement := range ownerIDStatements {
if err := d.exec(tx, ownerIDStatement, sourceOwnerID, destOwnerID); err != nil {
return fmt.Errorf("owner id statement %d/%d: %w", i, len(ownerIDStatements), err)
}
}
// We do this defensive coding WITHIN the transaction, because we don't want to leave dangling audit-logs.
// If something is being returned here we should abort the proposed merge/transfer of accounts, and fix the logic first.
stillReferenced, err := countLogsReferencingSource()
if err != nil {
return fmt.Errorf("counting audit_logs referencing source user: %w", err)
}
if stillReferenced != 0 {
return fmt.Errorf("%d audit_logs still reference source user after transfer", stillReferenced)
}
return nil
})
if err != nil {
return -1, fmt.Errorf("transferring audit_log ownership: %w", err)
stt, a.SecondaryTargetID, ownerFn(a.SecondaryTargetOwner),
}
return auditLogsUpdated, nil
return sql, args, id, nil
}

func rowsToAuditLogs(rows pgx.Rows) ([]*pacta.AuditLog, error) {
Expand Down
Loading

0 comments on commit 5cb3f58

Please sign in to comment.