Skip to content

Commit

Permalink
Run Analysis Tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
gbdubs authored and bcspragu committed Jan 9, 2024
1 parent c1257de commit fe3480c
Show file tree
Hide file tree
Showing 53 changed files with 1,431 additions and 318 deletions.
267 changes: 185 additions & 82 deletions azure/azevents/azevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Config struct {
AllowedAuthSecrets []string

ParsedPortfolioTopicName string
CreatedAuditTopicName string
CreatedReportTopicName string
}

type DB interface {
Expand All @@ -44,9 +46,16 @@ type DB interface {

IncompleteUploads(tx db.Tx, ids []pacta.IncompleteUploadID) (map[pacta.IncompleteUploadID]*pacta.IncompleteUpload, error)
UpdateIncompleteUpload(tx db.Tx, id pacta.IncompleteUploadID, mutations ...db.UpdateIncompleteUploadFn) error

Analysis(tx db.Tx, id pacta.AnalysisID) (*pacta.Analysis, error)
UpdateAnalysis(tx db.Tx, id pacta.AnalysisID, mutations ...db.UpdateAnalysisFn) error

CreateAnalysisArtifact(tx db.Tx, a *pacta.AnalysisArtifact) (pacta.AnalysisArtifactID, error)
}

const parsedPortfolioPath = "/events/parsed_portfolio"
const createdAuditPath = "/events/created_audit"
const createdReportPath = "/events/created_report"

func (c *Config) validate() error {
if c.Logger == nil {
Expand All @@ -64,6 +73,12 @@ func (c *Config) validate() error {
if c.ParsedPortfolioTopicName == "" {
return errors.New("no parsed portfolio topic name given")
}
if c.CreatedAuditTopicName == "" {
return errors.New("no created audit topic name given")
}
if c.CreatedReportTopicName == "" {
return errors.New("no created report topic name given")
}
if c.DB == nil {
return errors.New("no DB was given")
}
Expand Down Expand Up @@ -100,6 +115,8 @@ func NewServer(cfg *Config) (*Server, error) {
now: cfg.Now,
pathToTopic: map[string]string{
parsedPortfolioPath: cfg.ParsedPortfolioTopicName,
createdAuditPath: cfg.CreatedAuditTopicName,
createdReportPath: cfg.CreatedReportTopicName,
},
}, nil
}
Expand Down Expand Up @@ -209,17 +226,28 @@ func (s *Server) verifyWebhook(next http.Handler) http.Handler {

func (s *Server) RegisterHandlers(r chi.Router) {
r.Use(s.verifyWebhook)
r.Post(parsedPortfolioPath, func(w http.ResponseWriter, r *http.Request) {
var reqs []struct {
Data *task.ParsePortfolioResponse `json:"data"`
EventType string `json:"eventType"`
ID string `json:"id"`
Subject string `json:"subject"`
DataVersion string `json:"dataVersion"`
MetadataVersion string `json:"metadataVersion"`
EventTime time.Time `json:"eventTime"`
Topic string `json:"topic"`
}
r.Post(parsedPortfolioPath, handleEventGrid(s, s.handleParsePortfolio))
r.Post(createdAuditPath, handleEventGrid(s, s.handleCreatedAuditResponse))
r.Post(createdReportPath, handleEventGrid(s, s.handleCreatedReportResponse))
}

type eventGridTask[TaskType any] struct {
Data *TaskType `json:"data"`
EventType string `json:"eventType"`
ID string `json:"id"`
Subject string `json:"subject"`
DataVersion string `json:"dataVersion"`
MetadataVersion string `json:"metadataVersion"`
EventTime time.Time `json:"eventTime"`
Topic string `json:"topic"`
}

func handleEventGrid[TaskType any](
s *Server,
doHandleFn func(task eventGridTask[TaskType], w http.ResponseWriter),
) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var reqs []eventGridTask[TaskType]
if err := json.NewDecoder(r.Body).Decode(&reqs); err != nil {
s.logger.Error("failed to parse webhook request body", zap.Error(err))
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
Expand All @@ -231,93 +259,168 @@ func (s *Server) RegisterHandlers(r chi.Router) {
return
}
req := reqs[0]

if req.Data == nil {
s.logger.Error("webhook response had no payload", zap.String("event_grid_id", req.ID))
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
doHandleFn(req, w)
}
}

if len(req.Data.Outputs) == 0 {
s.logger.Error("webhook response had no processed portfolios", zap.String("event_grid_id", req.ID))
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
func (s *Server) handleParsePortfolio(task eventGridTask[task.ParsePortfolioResponse], w http.ResponseWriter) {
resp := task.Data
if len(resp.Outputs) == 0 {
s.logger.Error("webhook response had no processed portfolios", zap.String("event_grid_id", task.ID))
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}

portfolioIDs := []pacta.PortfolioID{}
var ranAt time.Time
now := s.now()
// We use a background context here rather than the one from the request so that it cannot be cancelled upstream.
err := s.db.Transactional(context.Background(), func(tx db.Tx) error {
incompleteUploads, err := s.db.IncompleteUploads(tx, req.Data.Request.IncompleteUploadIDs)
portfolioIDs := []pacta.PortfolioID{}
var ranAt time.Time
now := s.now()
// We use a background context here rather than the one from the request so that it cannot be cancelled upstream.
err := s.db.Transactional(context.Background(), func(tx db.Tx) error {
incompleteUploads, err := s.db.IncompleteUploads(tx, resp.Request.IncompleteUploadIDs)
if err != nil {
return fmt.Errorf("reading incomplete uploads: %w", err)
}
if len(incompleteUploads) == 0 {
return fmt.Errorf("no incomplete uploads found for ids: %v", resp.Request.IncompleteUploadIDs)
}
var holdingsDate *pacta.HoldingsDate
var ownerID pacta.OwnerID
for _, iu := range incompleteUploads {
if ownerID == "" {
ownerID = iu.Owner.ID
} else if ownerID != iu.Owner.ID {
return fmt.Errorf("multiple owners found for incomplete uploads: %+v", incompleteUploads)
}
if iu.HoldingsDate == nil {
return fmt.Errorf("incomplete upload %s had no holdings date", iu.ID)
}
if holdingsDate == nil {
holdingsDate = iu.HoldingsDate
} else if *holdingsDate != *iu.HoldingsDate {
return fmt.Errorf("multiple holdings dates found for incomplete uploads: %+v", incompleteUploads)
}
if iu.RanAt.After(ranAt) {
ranAt = iu.RanAt
}
}
for i, output := range resp.Outputs {
blobID, err := s.db.CreateBlob(tx, &output.Blob)
if err != nil {
return fmt.Errorf("reading incomplete uploads: %w", err)
return fmt.Errorf("creating blob %d: %w", i, err)
}
if len(incompleteUploads) == 0 {
return fmt.Errorf("no incomplete uploads found for ids: %v", req.Data.Request.IncompleteUploadIDs)
portfolioID, err := s.db.CreatePortfolio(tx, &pacta.Portfolio{
Owner: &pacta.Owner{ID: ownerID},
Name: output.Blob.FileName,
NumberOfRows: output.LineCount,
Blob: &pacta.Blob{ID: blobID},
HoldingsDate: holdingsDate,
})
if err != nil {
return fmt.Errorf("creating portfolio %d: %w", i, err)
}
var holdingsDate *pacta.HoldingsDate
var ownerID pacta.OwnerID
for _, iu := range incompleteUploads {
if ownerID == "" {
ownerID = iu.Owner.ID
} else if ownerID != iu.Owner.ID {
return fmt.Errorf("multiple owners found for incomplete uploads: %+v", incompleteUploads)
}
if iu.HoldingsDate == nil {
return fmt.Errorf("incomplete upload %s had no holdings date", iu.ID)
}
if holdingsDate == nil {
holdingsDate = iu.HoldingsDate
} else if *holdingsDate != *iu.HoldingsDate {
return fmt.Errorf("multiple holdings dates found for incomplete uploads: %+v", incompleteUploads)
}
if iu.RanAt.After(ranAt) {
ranAt = iu.RanAt
}
portfolioIDs = append(portfolioIDs, portfolioID)
}
for iuid, iu := range incompleteUploads {
err := s.db.UpdateIncompleteUpload(
tx,
iu.ID,
db.SetIncompleteUploadCompletedAt(now))
if err != nil {
return fmt.Errorf("updating incomplete upload %s: %w", iuid, err)
}
for i, output := range req.Data.Outputs {
blobID, err := s.db.CreateBlob(tx, &output.Blob)
if err != nil {
return fmt.Errorf("creating blob %d: %w", i, err)
}
portfolioID, err := s.db.CreatePortfolio(tx, &pacta.Portfolio{
Owner: &pacta.Owner{ID: ownerID},
Name: output.Blob.FileName,
NumberOfRows: output.LineCount,
Blob: &pacta.Blob{ID: blobID},
HoldingsDate: holdingsDate,
})
if err != nil {
return fmt.Errorf("creating portfolio %d: %w", i, err)
}
portfolioIDs = append(portfolioIDs, portfolioID)
}
return nil
})
if err != nil {
s.logger.Error("failed to save response to database", zap.Error(err))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}

s.logger.Info("parsed portfolio",
zap.String("task_id", string(resp.TaskID)),
zap.Duration("run_time", now.Sub(ranAt)),
zap.Strings("incomplete_upload_ids", asStrs(resp.Request.IncompleteUploadIDs)),
zap.Int("incomplete_upload_count", len(resp.Request.IncompleteUploadIDs)),
zap.Strings("portfolio_ids", asStrs(portfolioIDs)),
zap.Int("portfolio_count", len(portfolioIDs)))
}

func (s *Server) handleCreatedAuditResponse(task eventGridTask[task.CreateAuditResponse], w http.ResponseWriter) {
s.handleCompletedAnalysis(
pacta.AnalysisType_Audit,
task.Data.Request.AnalysisID,
task.ID,
task.Data.Artifacts,
w)
}

func (s *Server) handleCreatedReportResponse(task eventGridTask[task.CreateReportResponse], w http.ResponseWriter) {
s.handleCompletedAnalysis(
pacta.AnalysisType_Report,
task.Data.Request.AnalysisID,
task.ID,
task.Data.Artifacts,
w)
}

func (s *Server) handleCompletedAnalysis(
analysisType pacta.AnalysisType,
analysisID pacta.AnalysisID,
taskID string,
artifacts []*task.AnalysisArtifact,
w http.ResponseWriter) {
var ranAt time.Time
now := s.now()
// We use a background context here rather than the one from the request so that it cannot be cancelled upstream.
err := s.db.Transactional(context.Background(), func(tx db.Tx) error {
a, err := s.db.Analysis(tx, analysisID)
if err != nil {
return fmt.Errorf("reading analysis: %w", err)
}
if a.AnalysisType != analysisType {
return fmt.Errorf("analysis type mismatch: %q != %q", a.AnalysisType, analysisType)
}
ranAt = a.RanAt
for _, artifact := range artifacts {
blobID, err := s.db.CreateBlob(tx, &pacta.Blob{
FileName: artifact.FileName,
FileType: artifact.FileType,
BlobURI: artifact.BlobURI,
})
if err != nil {
return fmt.Errorf("creating blob: %w", err)
}
for i, iu := range incompleteUploads {
err := s.db.UpdateIncompleteUpload(
tx,
iu.ID,
db.SetIncompleteUploadCompletedAt(now))
if err != nil {
return fmt.Errorf("updating incomplete upload %d: %w", i, err)
}
_, err = s.db.CreateAnalysisArtifact(tx, &pacta.AnalysisArtifact{
Blob: &pacta.Blob{ID: blobID},
AnalysisID: analysisID,
})
if err != nil {
return fmt.Errorf("creating analysis artifact: %w", err)
}
return nil
})
}
err = s.db.UpdateAnalysis(tx, analysisID, db.SetAnalysisCompletedAt(now))
if err != nil {
s.logger.Error("failed to save response to database", zap.Error(err))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
return fmt.Errorf("updating analysis: %w", err)
}

s.logger.Info("parsed portfolio",
zap.String("task_id", string(req.Data.TaskID)),
zap.Duration("run_time", now.Sub(ranAt)),
zap.Strings("incomplete_upload_ids", asStrs(req.Data.Request.IncompleteUploadIDs)),
zap.Int("incomplete_upload_count", len(req.Data.Request.IncompleteUploadIDs)),
zap.Strings("portfolio_ids", asStrs(portfolioIDs)),
zap.Int("portfolio_count", len(portfolioIDs)))
return nil
})
if err != nil {
s.logger.Error("failed to save analysis response to database", zap.Error(err))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}

s.logger.Info("analysis completed",
zap.String("analysis_type", string(analysisType)),
zap.String("task_id", string(taskID)),
zap.Duration("run_time", now.Sub(ranAt)),
zap.String("analysis_id", string(analysisID)))
}

func asStrs[T ~string](ts []T) []string {
Expand Down
Loading

0 comments on commit fe3480c

Please sign in to comment.