diff --git a/azure/azevents/BUILD.bazel b/azure/azevents/BUILD.bazel new file mode 100644 index 0000000..7555876 --- /dev/null +++ b/azure/azevents/BUILD.bazel @@ -0,0 +1,12 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "azevents", + srcs = ["azevents.go"], + importpath = "github.com/RMI/pacta/azure/azevents", + visibility = ["//visibility:public"], + deps = [ + "@com_github_go_chi_chi_v5//:chi", + "@org_uber_go_zap//:zap", + ], +) diff --git a/azure/azevents/azevents.go b/azure/azevents/azevents.go new file mode 100644 index 0000000..3b0d897 --- /dev/null +++ b/azure/azevents/azevents.go @@ -0,0 +1,108 @@ +// Package azevents handles incoming webhook events from Azure Event Grid. For +// more info on the verification/validation logic, see +// https://learn.microsoft.com/en-us/azure/event-grid/webhook-event-delivery#validation-details +package azevents + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "time" + + "github.com/go-chi/chi/v5" + "go.uber.org/zap" +) + +type Config struct { + Logger *zap.Logger +} + +func (c *Config) validate() error { + if c.Logger == nil { + return errors.New("no logger was given") + } + return nil +} + +// Server handles both validating the Event Grid subscription and handling incoming events. +type Server struct { + logger *zap.Logger +} + +func NewServer(cfg *Config) (*Server, error) { + if err := cfg.validate(); err != nil { + return nil, fmt.Errorf("invalid config given: %w", err) + } + + return &Server{ + logger: cfg.Logger, + }, nil +} + +func (s *Server) verifyWebhook(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("aeg-event-type") != "SubscriptionValidation" { + next.ServeHTTP(w, r) + return + } + + // If we're here, we're validating to Azure that we own this endpoint and want + // to accept webhook calls here. Only calls to valid webhook endpoints will + // trigger this middleware (the rest will just get 404s), so if we've made it + // this far, validate that yes, we'll take webhook invocations. + var reqs []struct { + Id string `json:"id"` + Topic string `json:"topic"` + Subject string `json:"subject"` + EventType string `json:"eventType"` + EventTime time.Time `json:"eventTime"` + MetadataVersion string `json:"metadataVersion"` + DataVersion string `json:"dataVersion"` + Data *struct { + ValidationCode string `json:"validationCode"` + ValidationUrl string `json:"validationUrl"` + } `json:"data"` + } + if err := json.NewDecoder(r.Body).Decode(&reqs); err != nil { + s.logger.Error("failed to decode subscription validation request", zap.Error(err)) + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } + + if len(reqs) != 1 { + s.logger.Error("unexpected number of validation requests", zap.Any("reqs", reqs)) + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } + req := reqs[0] + if req.Data == nil { + s.logger.Error("no data provided in validation request", zap.Any("req", req)) + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } + + s.logger.Info("received SubscriptionValidation request", zap.Any("req", req)) + + resp := struct { + ValidationResponse string `json:"validationResponse"` + }{req.Data.ValidationCode} + if err := json.NewEncoder(w).Encode(resp); err != nil { + s.logger.Error("failed to encode JSON validation response", zap.Error(err)) + } + }) +} + +func (s *Server) RegisterHandlers(r chi.Router) { + r.Use(s.verifyWebhook) + r.Post("/events/processed_portfolio", func(w http.ResponseWriter, r *http.Request) { + dat, err := io.ReadAll(r.Body) + if err != nil { + s.logger.Error("failed to read webhook request body", zap.Error(err)) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + s.logger.Info("processed porfolio", zap.String("portfolio_data", string(dat))) + }) +} diff --git a/cmd/server/BUILD.bazel b/cmd/server/BUILD.bazel index f8d79d8..a093c11 100644 --- a/cmd/server/BUILD.bazel +++ b/cmd/server/BUILD.bazel @@ -9,6 +9,7 @@ go_library( visibility = ["//visibility:private"], deps = [ "//azure/azblob", + "//azure/azevents", "//azure/aztask", "//cmd/runner/taskrunner", "//cmd/server/pactasrv", diff --git a/cmd/server/main.go b/cmd/server/main.go index 1269997..15075f9 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -14,6 +14,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/RMI/pacta/azure/azblob" + "github.com/RMI/pacta/azure/azevents" "github.com/RMI/pacta/azure/aztask" "github.com/RMI/pacta/cmd/runner/taskrunner" "github.com/RMI/pacta/cmd/server/pactasrv" @@ -274,7 +275,15 @@ func run(args []string) error { }), }) + eventSrv, err := azevents.NewServer(&azevents.Config{ + Logger: logger, + }) + if err != nil { + return fmt.Errorf("failed to init Azure Event Grid handler: %w", err) + } + r := chi.NewRouter() + r.Group(eventSrv.RegisterHandlers) jwKey, err := jwk.FromRaw(sec.AuthVerificationKey.PublicKey) if err != nil {