Skip to content

Commit

Permalink
Additional Approximate Plubming
Browse files Browse the repository at this point in the history
  • Loading branch information
gbdubs committed Jan 2, 2024
1 parent f5a5ebf commit 570db71
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 4 deletions.
157 changes: 157 additions & 0 deletions azure/azevents/azevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Config struct {
AllowedAuthSecrets []string

ParsedPortfolioTopicName string
CreatedAuditTopicName string
CreatedReportTopicName string
}

type DB interface {
Expand All @@ -45,10 +47,15 @@ type DB interface {
IncompleteUploads(tx db.Tx, ids []pacta.IncompleteUploadID) (map[pacta.IncompleteUploadID]*pacta.IncompleteUpload, error)
UpdateIncompleteUpload(tx db.Tx, id pacta.IncompleteUploadID, mutations ...db.UpdateIncompleteUploadFn) error

Analysis(tx db.Tx, id pacta.AnalysisID) (*pacta.Analysis, error)
UpdateAnalysis(tx db.Tx, id pacta.AnalysisID, mutations ...db.UpdateAnalysisFn) error

CreateAnalysisArtifact(tx db.Tx, a *pacta.AnalysisArtifact) (pacta.AnalysisArtifactID, error)
}

const parsedPortfolioPath = "/events/parsed_portfolio"
const createdAuditPath = "/events/created_audit"
const createdReportPath = "/events/created_report"

func (c *Config) validate() error {
if c.Logger == nil {
Expand All @@ -66,6 +73,12 @@ func (c *Config) validate() error {
if c.ParsedPortfolioTopicName == "" {
return errors.New("no parsed portfolio topic name given")
}
if c.CreatedAuditTopicName == "" {
return errors.New("no created audit topic name given")
}
if c.CreatedReportTopicName == "" {
return errors.New("no created report topic name given")
}
if c.DB == nil {
return errors.New("no DB was given")
}
Expand Down Expand Up @@ -102,6 +115,8 @@ func NewServer(cfg *Config) (*Server, error) {
now: cfg.Now,
pathToTopic: map[string]string{
parsedPortfolioPath: cfg.ParsedPortfolioTopicName,
createdAuditPath: cfg.CreatedAuditTopicName,
createdReportPath: cfg.CreatedReportTopicName,
},
}, nil
}
Expand Down Expand Up @@ -212,6 +227,8 @@ func (s *Server) verifyWebhook(next http.Handler) http.Handler {
func (s *Server) RegisterHandlers(r chi.Router) {
r.Use(s.verifyWebhook)
r.Post(parsedPortfolioPath, s.handleParsePortfolioResponse())
r.Post(createdAuditPath, s.handleCreatedAuditResponse())
r.Post(createdReportPath, s.handleCreatedReportResponse())
}

func (s *Server) handleParsePortfolioResponse() http.HandlerFunc {
Expand Down Expand Up @@ -326,6 +343,146 @@ func (s *Server) handleParsePortfolioResponse() http.HandlerFunc {
}
}

func (s *Server) handleCreatedAuditResponse() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var reqs []struct {
Data *task.CreateAuditResponse `json:"data"`
EventType string `json:"eventType"`
ID string `json:"id"`
Subject string `json:"subject"`
DataVersion string `json:"dataVersion"`
MetadataVersion string `json:"metadataVersion"`
EventTime time.Time `json:"eventTime"`
Topic string `json:"topic"`
}
if err := json.NewDecoder(r.Body).Decode(&reqs); err != nil {
s.logger.Error("failed to parse webhook request body", zap.Error(err))
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
if len(reqs) != 1 {
s.logger.Error("webhook response had unexpected number of events", zap.Int("event_count", len(reqs)))
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
req := reqs[0]

if req.Data == nil {
s.logger.Error("webhook response had no payload", zap.String("event_grid_id", req.ID))
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}

var ranAt time.Time
now := s.now()
// We use a background context here rather than the one from the request so that it cannot be cancelled upstream.
err := s.db.Transactional(context.Background(), func(tx db.Tx) error {
for _, artifact := range req.Data.Artifacts {
blobID, err := s.db.CreateBlob(tx, &pacta.Blob{
FileName: artifact.FileName,
FileType: artifact.FileType,
BlobURI: artifact.BlobURI,
})
if err != nil {
return fmt.Errorf("creating blob: %w", err)
}
_, err = s.db.CreateAnalysisArtifact(tx, &pacta.AnalysisArtifact{
Blob: &pacta.Blob{ID: blobID},
AnalysisID: req.Data.Request.AnalysisID,
})
if err != nil {
return fmt.Errorf("creating analysis artifact: %w", err)
}
}
err := s.db.UpdateAnalysis(tx, req.Data.Request.AnalysisID, db.SetAnalysisCompletedAt(now))
if err != nil {
return fmt.Errorf("updating analysis: %w", err)
}
return nil
})
if err != nil {
s.logger.Error("failed to save analysis response to database", zap.Error(err))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}

s.logger.Info("audit completed",
zap.String("task_id", string(req.Data.TaskID)),
zap.Duration("run_time", now.Sub(ranAt)),
zap.String("analysis_id", string(req.Data.Request.AnalysisID)))
}
}

func (s *Server) handleCreatedReportResponse() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var reqs []struct {
Data *task.CreateReportResponse `json:"data"`
EventType string `json:"eventType"`
ID string `json:"id"`
Subject string `json:"subject"`
DataVersion string `json:"dataVersion"`
MetadataVersion string `json:"metadataVersion"`
EventTime time.Time `json:"eventTime"`
Topic string `json:"topic"`
}
if err := json.NewDecoder(r.Body).Decode(&reqs); err != nil {
s.logger.Error("failed to parse webhook request body", zap.Error(err))
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
if len(reqs) != 1 {
s.logger.Error("webhook response had unexpected number of events", zap.Int("event_count", len(reqs)))
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
req := reqs[0]

if req.Data == nil {
s.logger.Error("webhook response had no payload", zap.String("event_grid_id", req.ID))
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}

var ranAt time.Time
now := s.now()
// We use a background context here rather than the one from the request so that it cannot be cancelled upstream.
err := s.db.Transactional(context.Background(), func(tx db.Tx) error {
for _, artifact := range req.Data.Artifacts {
blobID, err := s.db.CreateBlob(tx, &pacta.Blob{
FileName: artifact.FileName,
FileType: artifact.FileType,
BlobURI: artifact.BlobURI,
})
if err != nil {
return fmt.Errorf("creating blob: %w", err)
}
_, err = s.db.CreateAnalysisArtifact(tx, &pacta.AnalysisArtifact{
Blob: &pacta.Blob{ID: blobID},
AnalysisID: req.Data.Request.AnalysisID,
})
if err != nil {
return fmt.Errorf("creating analysis artifact: %w", err)
}
}
err := s.db.UpdateAnalysis(tx, req.Data.Request.AnalysisID, db.SetAnalysisCompletedAt(now))
if err != nil {
return fmt.Errorf("updating analysis: %w", err)
}
return nil
})
if err != nil {
s.logger.Error("failed to save analysis response to database", zap.Error(err))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}

s.logger.Info("report completed",
zap.String("task_id", string(req.Data.TaskID)),
zap.Duration("run_time", now.Sub(ranAt)),
zap.String("analysis_id", string(req.Data.Request.AnalysisID)))
}
}

func asStrs[T ~string](ts []T) []string {
ss := make([]string, len(ts))
for i, t := range ts {
Expand Down
16 changes: 12 additions & 4 deletions task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,16 @@ type CreateAuditRequest struct {
BlobURIs []pacta.BlobURI
}

type AnalysisArtifact struct {
BlobURI pacta.BlobURI
FileName string
FileType pacta.FileType
}

type CreateAuditResponse struct {
TaskID ID
Request *CreateAuditRequest
TaskID ID
Request *CreateAuditRequest
Artifacts []*AnalysisArtifact
}

type CreateReportRequest struct {
Expand All @@ -54,8 +61,9 @@ type CreateReportRequest struct {
}

type CreateReportResponse struct {
TaskID ID
Request *CreateReportRequest
TaskID ID
Request *CreateReportRequest
Artifacts []*AnalysisArtifact
}

type EnvVar struct {
Expand Down

0 comments on commit 570db71

Please sign in to comment.