Skip to content

Commit

Permalink
Creates Backend for Merging User Accounts
Browse files Browse the repository at this point in the history
  • Loading branch information
gbdubs committed Jan 5, 2024
1 parent e4dbd9f commit d980622
Show file tree
Hide file tree
Showing 18 changed files with 475 additions and 20 deletions.
1 change: 1 addition & 0 deletions cmd/server/pactasrv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "pactasrv",
srcs = [
"admin.go",
"analysis.go",
"audit_logs.go",
"blobs.go",
Expand Down
159 changes: 159 additions & 0 deletions cmd/server/pactasrv/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package pactasrv

import (
"context"
"fmt"

"github.com/RMI/pacta/db"
"github.com/RMI/pacta/oapierr"
api "github.com/RMI/pacta/openapi/pacta"
"github.com/RMI/pacta/pacta"
"go.uber.org/zap"
)

// Merges two users together
// (POST /admin/merge-users)
func (s *Server) MergeUsers(ctx context.Context, request api.MergeUsersRequestObject) (api.MergeUsersResponseObject, error) {
req := request.Body
actorUserInfo, err := s.getActorInfoOrFail(ctx)
if err != nil {
return nil, err
}
if !actorUserInfo.IsAdmin && !actorUserInfo.IsSuperAdmin {
return nil, oapierr.Forbidden("only admins can merge users",
zap.String("actor_user_id", string(actorUserInfo.UserID)),
zap.String("from_user_id", req.FromUserId),
zap.String("to_user_id", req.ToUserId))
}

sourceUID := pacta.UserID(req.FromUserId)
destUID := pacta.UserID(req.ToUserId)

var (
numIncompleteUploads, numAnalyses, numPortfolios, numPortfolioGroups, numAuditLogs, numAuditLogsCreated int
buris []pacta.BlobURI
)

err = s.DB.Transactional(ctx, func(tx db.Tx) error {
sourceOwner, err := s.DB.GetOwnerForUser(tx, sourceUID)
if err != nil {
return fmt.Errorf("failed to get owner for source user: %w", err)
}
destOwner, err := s.DB.GetOwnerForUser(tx, destUID)
if err != nil {
return fmt.Errorf("failed to get owner for destination user: %w", err)
}

// Note we do an audit log transfer FIRST so that we don't transfer the audit logs generated from the transfer itself.
nal, err := s.DB.TransferAuditLogOwnership(tx, sourceUID, destUID, sourceOwner, destOwner)
if err != nil {
return fmt.Errorf("failed to transfer audit log ownership: %w", err)
}
numAuditLogs = nal

auditLogsToCreate := []pacta.AuditLog{}
addAuditLog := func(t pacta.AuditLogTargetType, id string) {
auditLogsToCreate = append(auditLogsToCreate, pacta.AuditLog{
Action: pacta.AuditLogAction_TransferOwnership,
ActorType: pacta.AuditLogActorType_Admin,
ActorID: string(actorUserInfo.UserID),
ActorOwner: &pacta.Owner{ID: actorUserInfo.OwnerID},
PrimaryTargetType: t,
PrimaryTargetID: id,
PrimaryTargetOwner: &pacta.Owner{ID: destOwner},
SecondaryTargetType: pacta.AuditLogTargetType_User,
SecondaryTargetID: string(sourceUID),
SecondaryTargetOwner: &pacta.Owner{ID: sourceOwner},
})
}

incompleteUploads, err := s.DB.IncompleteUploadsByOwner(tx, sourceOwner)
if err != nil {
return fmt.Errorf("failed to get incomplete uploads for source owner: %w", err)
}
for i, upload := range incompleteUploads {
err := s.DB.UpdateIncompleteUpload(tx, upload.ID, db.SetIncompleteUploadOwner(destOwner))
if err != nil {
return fmt.Errorf("failed to update upload owner %d/%d: %w", i, len(incompleteUploads), err)
}
addAuditLog(pacta.AuditLogTargetType_IncompleteUpload, string(upload.ID))
}
numIncompleteUploads = len(incompleteUploads)

analyses, err := s.DB.AnalysesByOwner(tx, sourceOwner)
if err != nil {
return fmt.Errorf("failed to get analyses for source owner: %w", err)
}
for i, analysis := range analyses {
err := s.DB.UpdateAnalysis(tx, analysis.ID, db.SetAnalysisOwner(destOwner))
if err != nil {
return fmt.Errorf("failed to update analysis owner %d/%d: %w", i, len(analyses), err)
}
addAuditLog(pacta.AuditLogTargetType_Analysis, string(analysis.ID))
}
numAnalyses = len(analyses)

portfolios, err := s.DB.PortfoliosByOwner(tx, sourceOwner)
if err != nil {
return fmt.Errorf("failed to get portfolios for source owner: %w", err)
}
for i, portfolio := range portfolios {
err := s.DB.UpdatePortfolio(tx, portfolio.ID, db.SetPortfolioOwner(destOwner))
if err != nil {
return fmt.Errorf("failed to update portfolio owner %d/%d: %w", i, len(portfolios), err)
}
addAuditLog(pacta.AuditLogTargetType_Portfolio, string(portfolio.ID))
}
numPortfolios = len(portfolios)

portfolioGroups, err := s.DB.PortfolioGroupsByOwner(tx, sourceOwner)
if err != nil {
return fmt.Errorf("failed to get portfolio groups for source owner: %w", err)
}
for i, portfolioGroup := range portfolioGroups {
err := s.DB.UpdatePortfolioGroup(tx, portfolioGroup.ID, db.SetPortfolioGroupOwner(destOwner))
if err != nil {
return fmt.Errorf("failed to update portfolio group owner %d/%d: %w", i, len(portfolioGroups), err)
}
addAuditLog(pacta.AuditLogTargetType_PortfolioGroup, string(portfolioGroup.ID))
}
numPortfolioGroups = len(portfolioGroups)

for _, auditLog := range auditLogsToCreate {
_, err := s.DB.CreateAuditLog(tx, &auditLog)
if err != nil {
return fmt.Errorf("failed to create audit log: %w", err)
}
}
numAuditLogsCreated = len(auditLogsToCreate)

// Now that we've transferred all the audit logs, we can delete the user.
newBuris, err := s.DB.DeleteUser(tx, sourceUID)
if err != nil {
return fmt.Errorf("failed to delete user: %w", err)
}
buris = append(buris, newBuris...)

return nil
})
if err != nil {
return nil, oapierr.Internal("failed to merge users", zap.Error(err), zap.String("actor_user_id", string(actorUserInfo.UserID)), zap.String("from_user_id", req.FromUserId), zap.String("to_user_id", req.ToUserId))
}

if err := s.deleteBlobs(ctx, buris...); err != nil {
return nil, err
}

s.Logger.Info("user merge completed successfully",
zap.String("actor_user_id", string(actorUserInfo.UserID)),
zap.String("from_user_id", req.FromUserId),
zap.String("to_user_id", req.ToUserId),
zap.Int("num_audit_logs_transferred", numAuditLogs),
zap.Int("num_incomplete_uploads", numIncompleteUploads),
zap.Int("num_analyses", numAnalyses),
zap.Int("num_portfolios", numPortfolios),
zap.Int("num_portfolio_groups", numPortfolioGroups),
zap.Int("num_audit_logs_created", numAuditLogsCreated),
)
return api.MergeUsers204Response{}, nil
}
2 changes: 2 additions & 0 deletions cmd/server/pactasrv/conv/oapi_to_pacta.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ func auditLogActionFromOAPI(i api.AuditLogAction) (pacta.AuditLogAction, error)
return pacta.AuditLogAction_EnableSharing, nil
case api.AuditLogActionDisableSharing:
return pacta.AuditLogAction_DisableSharing, nil
case api.AuditLogActionTransferOwnership:
return pacta.AuditLogAction_TransferOwnership, nil
}
return "", oapierr.BadRequest("unknown audit log action", zap.String("audit_log_action", string(i)))
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/server/pactasrv/conv/pacta_to_oapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,8 @@ func auditLogActionToOAPI(i pacta.AuditLogAction) (api.AuditLogAction, error) {
return api.AuditLogActionEnableSharing, nil
case pacta.AuditLogAction_DisableSharing:
return api.AuditLogActionDisableSharing, nil
case pacta.AuditLogAction_TransferOwnership:
return api.AuditLogActionTransferOwnership, nil
}
return "", oapierr.Internal(fmt.Sprintf("auditLogActionToOAPI: unknown action: %q", i))
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/server/pactasrv/pactasrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,11 @@ type DB interface {
User(tx db.Tx, id pacta.UserID) (*pacta.User, error)
Users(tx db.Tx, ids []pacta.UserID) (map[pacta.UserID]*pacta.User, error)
UpdateUser(tx db.Tx, id pacta.UserID, mutations ...db.UpdateUserFn) error
DeleteUser(tx db.Tx, id pacta.UserID) error
DeleteUser(tx db.Tx, id pacta.UserID) ([]pacta.BlobURI, error)

CreateAuditLog(tx db.Tx, a *pacta.AuditLog) (pacta.AuditLogID, error)
AuditLogs(tx db.Tx, q *db.AuditLogQuery) ([]*pacta.AuditLog, *db.PageInfo, error)
TransferAuditLogOwnership(tx db.Tx, sourceUserID, destUserID pacta.UserID, sourceOwnerID, destOwnerID pacta.OwnerID) (int, error)
}

type Blob interface {
Expand Down
5 changes: 4 additions & 1 deletion cmd/server/pactasrv/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,13 @@ func (s *Server) UpdateUser(ctx context.Context, request api.UpdateUserRequestOb
// (DELETE /user/{id})
func (s *Server) DeleteUser(ctx context.Context, request api.DeleteUserRequestObject) (api.DeleteUserResponseObject, error) {
// TODO(#12) Implement Authorization
err := s.DB.DeleteUser(s.DB.NoTxn(ctx), pacta.UserID(request.Id))
blobURIs, err := s.DB.DeleteUser(s.DB.NoTxn(ctx), pacta.UserID(request.Id))
if err != nil {
return nil, oapierr.Internal("failed to delete user", zap.Error(err))
}
if err := s.deleteBlobs(ctx, blobURIs...); err != nil {
return nil, err
}
return api.DeleteUser204Response{}, nil
}

Expand Down
64 changes: 64 additions & 0 deletions db/sqldb/audit_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,70 @@ func (d *DB) CreateAuditLog(tx db.Tx, a *pacta.AuditLog) (pacta.AuditLogID, erro
return id, nil
}

func (d *DB) TransferAuditLogOwnership(tx db.Tx, sourceUserID, destUserID pacta.UserID, sourceOwnerID, destOwnerID pacta.OwnerID) (int, error) {
auditLogsUpdated := -1
err := d.RunOrContinueTransaction(tx, func(tx db.Tx) error {
countLogsReferencingSource := func() (int, error) {
row := d.queryRow(tx, `
SELECT
COUNT(*)
FROM audit_log
WHERE
actor_id = $1 OR
primary_target_id = $1 OR
secondary_target_id = $1 OR
actor_owner_id = $2 OR
primary_target_owner_id = $2 OR
secondary_target_owner_id = $2;`, sourceUserID, sourceOwnerID)
n := -1
if err := row.Scan(&n); err != nil {
return -1, fmt.Errorf("scanning audit_log count: %w", err)
}
return n, nil
}
alu, err := countLogsReferencingSource()
if err != nil {
return fmt.Errorf("counting audit_logs referencing source user: %w", err)
}
auditLogsUpdated = alu

userIDStatements := []string{
`UPDATE audit_log SET actor_id = $2 WHERE actor_id = $1;`,
`UPDATE audit_log SET primary_target_id = $2 WHERE primary_target_id = $1;`,
`UPDATE audit_log SET secondary_target_id = $2 WHERE secondary_target_id = $1;`,
}
for i, userIDStatement := range userIDStatements {
if err := d.exec(tx, userIDStatement, sourceUserID, destUserID); err != nil {
return fmt.Errorf("user id statement %d/%d: %w", i, len(userIDStatements), err)
}
}
ownerIDStatements := []string{
`UPDATE audit_log SET actor_owner_id = $2 WHERE actor_owner_id = $1;`,
`UPDATE audit_log SET primary_target_owner_id = $2 WHERE primary_target_owner_id = $1;`,
`UPDATE audit_log SET secondary_target_owner_id = $2 WHERE secondary_target_owner_id = $1;`,
}
for i, ownerIDStatement := range ownerIDStatements {
if err := d.exec(tx, ownerIDStatement, sourceOwnerID, destOwnerID); err != nil {
return fmt.Errorf("owner id statement %d/%d: %w", i, len(ownerIDStatements), err)
}
}
// We do this defensive coding WITHIN the transaction, because we don't want to leave dangling audit-logs.
// If something is being returned here we should abort the proposed merge/transfer of accounts, and fix the logic first.
stillReferenced, err := countLogsReferencingSource()
if err != nil {
return fmt.Errorf("counting audit_logs referencing source user: %w", err)
}
if stillReferenced != 0 {
return fmt.Errorf("%d audit_logs still reference source user after transfer", stillReferenced)
}
return nil
})
if err != nil {
return -1, fmt.Errorf("transferring audit_log ownership: %w", err)
}
return auditLogsUpdated, nil
}

func rowsToAuditLogs(rows pgx.Rows) ([]*pacta.AuditLog, error) {
return mapRows("auditLog", rows, rowToAuditLog)
}
Expand Down
Loading

0 comments on commit d980622

Please sign in to comment.