Skip to content

Commit

Permalink
Finish adding Event Grid validation, and add local testing tools
Browse files Browse the repository at this point in the history
This PR finishes up the work of validating requests from Event Grid, and adds support for running the server with a public-facing endpoint via the `--with_public_endpoint=<subdomain>` flag, which is proxied via [frp](https://github.com/fatedier/frp).

I didn't use our [frpembed](https://github.com/Silicon-Ally/frpembed) library because there's an issue with `quic-go` not supporting Go 1.20 (may be fixed, didn't want to go down the rabbithole)
  • Loading branch information
bcspragu committed Oct 31, 2023
1 parent 0f8b404 commit 47ccc47
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 8 deletions.
43 changes: 42 additions & 1 deletion azure/azevents/azevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"net/http"
"path"
"time"

"github.com/go-chi/chi/v5"
Expand All @@ -17,18 +18,30 @@ import (

type Config struct {
Logger *zap.Logger

Subscription string
ResourceGroup string
}

func (c *Config) validate() error {
if c.Logger == nil {
return errors.New("no logger was given")
}
if c.Subscription == "" {
return errors.New("no subscription given")
}
if c.ResourceGroup == "" {
return errors.New("no resource group given")
}
return nil
}

// Server handles both validating the Event Grid subscription and handling incoming events.
type Server struct {
logger *zap.Logger

subscription string
resourceGroup string
}

func NewServer(cfg *Config) (*Server, error) {
Expand All @@ -37,12 +50,25 @@ func NewServer(cfg *Config) (*Server, error) {
}

return &Server{
logger: cfg.Logger,
logger: cfg.Logger,
subscription: cfg.Subscription,
resourceGroup: cfg.ResourceGroup,
}, nil
}

var pathToTopic = map[string]string{
"/events/processed_portfolio": "processed-portfolios",
}

func (s *Server) verifyWebhook(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
topic, ok := pathToTopic[r.URL.Path]
if !ok {
s.logger.Error("no topic found for path", zap.String("path", r.URL.Path))
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
return
}

if r.Header.Get("aeg-event-type") != "SubscriptionValidation" {
next.ServeHTTP(w, r)
return
Expand Down Expand Up @@ -85,6 +111,21 @@ func (s *Server) verifyWebhook(next http.Handler) http.Handler {

s.logger.Info("received SubscriptionValidation request", zap.Any("req", req))

// Validate the request event type and topic
if got, want := req.EventType, "Microsoft.EventGrid.SubscriptionValidationEvent"; got != want {
s.logger.Error("invalid topic given for path", zap.String("got_event_type", got), zap.String("expected_event_type", want))
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
fullTopic := path.Join("/subscriptions", s.subscription, "resourceGroups", s.resourceGroup, "providers/Microsoft.EventGrid/topics", topic)
if req.Topic != fullTopic {
s.logger.Error("invalid topic given for path", zap.String("got_topic", req.Topic), zap.String("expected_topic", fullTopic))
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}

s.logger.Info("validated SubscriptionValidation, responding success", zap.String("request_id", req.Id))

resp := struct {
ValidationResponse string `json:"validationResponse"`
}{req.Data.ValidationCode}
Expand Down
3 changes: 3 additions & 0 deletions cmd/server/configs/dev.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
env dev
allowed_cors_origin https://pacta.dev.rmi.siliconally.dev
port 80

azure_event_subscription 69b6db12-37e3-4e1f-b48c-aa41dba612a9
azure_event_resource_group rmi-pacta-dev
3 changes: 3 additions & 0 deletions cmd/server/configs/local.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
env local
allowed_cors_origin http://localhost:3000

azure_event_subscription 69b6db12-37e3-4e1f-b48c-aa41dba612a9
azure_event_resource_group rmi-pacta-local

secret_postgres_host UNUSED
# Also unused
secret_postgres_port 1234
Expand Down
9 changes: 7 additions & 2 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/RMI/pacta/task"
"github.com/Silicon-Ally/cryptorand"
"github.com/Silicon-Ally/zaphttplog"
"github.com/go-chi/chi/v5"
chi "github.com/go-chi/chi/v5"
"github.com/go-chi/httprate"
"github.com/go-chi/jwtauth/v5"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -62,6 +62,9 @@ func run(args []string) error {
env = fs.String("env", "", "The environment that we're running in.")
localDSN = fs.String("local_dsn", "", "If set, override the DB addresses retrieved from the secret configuration. Can only be used when running locally.")

azEventSubscription = fs.String("azure_event_subscription", "", "The Azure Subscription ID to allow webhook registrations from")
azEventResourceGroup = fs.String("azure_event_resource_group", "", "The Azure resource group to allow webhook registrations from")

// Only when running locally because the Dockerized runner can't use local `az` CLI credentials
localDockerTenantID = fs.String("local_docker_tenant_id", "", "The Azure Tenant ID the localdocker service principal lives in")
localDockerClientID = fs.String("local_docker_client_id", "", "The client ID of the localdocker service principal")
Expand Down Expand Up @@ -276,7 +279,9 @@ func run(args []string) error {
})

eventSrv, err := azevents.NewServer(&azevents.Config{
Logger: logger,
Logger: logger,
Subscription: *azEventSubscription,
ResourceGroup: *azEventResourceGroup,
})
if err != nil {
return fmt.Errorf("failed to init Azure Event Grid handler: %w", err)
Expand Down
43 changes: 40 additions & 3 deletions scripts/run_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ cd "$ROOT"

# We keep it around because we'll need it at some point, but it can't be empty.
VALID_FLAGS=(
"unused"
"with_public_endpoint"
)

VALID_FLAGS_NO_ARGS=(
Expand All @@ -23,6 +23,27 @@ OPTS=$(getopt \
-- "$@"
)

if ! [ -x "$(command -v sops)" ]; then
echo 'Error: sops is not installed.' >&2
exit 1
fi
if ! [ -x "$(command -v jq)" ]; then
echo 'Error: jq is not installed.' >&2
exit 1
fi

SOPS_DATA="$(sops -d "${ROOT}/secrets/local.enc.json")"
LOCAL_DOCKER_CREDS="$(echo $SOPS_DATA | jq .localdocker)"

FRPC_PID=""
function cleanup {
if [[ ! -z "${FRPC_PID}" ]]; then
echo "Stopping FRP client/proxy..."
kill $FRPC_PID
fi
}
trap cleanup EXIT

eval set --$OPTS
declare -a FLAGS=()
while [ ! $# -eq 0 ]
Expand All @@ -31,6 +52,24 @@ do
--use_azure_runner)
FLAGS+=("--use_azure_runner")
;;
--with_public_endpoint)
if ! [ -x "$(command -v frpc)" ]; then
echo 'Error: frpc is not installed, cannot run the FRP client/proxy.' >&2
exit 1
fi
FRP="$(echo $SOPS_DATA | jq .frpc)"
FRP_ADDR="$(echo $FRP | jq -r .addr)"
echo "Running FRP proxy at ${FRP_ADDR}..."
frpc http \
--server_addr="$FRP_ADDR" \
--server_port="$(echo $FRP | jq -r .port)" \
--token="$(echo $FRP | jq -r .token)" \
--local_port=8081 \
--proxy_name="webhook-$2" \
--sd="$2" &
FRPC_PID=$!
shift # Extra shift for the subdomain parameter
;;
esac
shift
done
Expand All @@ -53,8 +92,6 @@ FLAGS+=(
"--local_dsn=${LOCAL_DSN}"
)

LOCAL_DOCKER_CREDS="$(sops -d --extract '["localdocker"]' "${ROOT}/secrets/local.enc.json")"

FLAGS+=(
"--local_docker_tenant_id=$(echo $LOCAL_DOCKER_CREDS | jq -r .tenant_id)"
"--local_docker_client_id=$(echo $LOCAL_DOCKER_CREDS | jq -r .client_id)"
Expand Down
9 changes: 7 additions & 2 deletions secrets/local.enc.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
"client_id": "ENC[AES256_GCM,data:yex2my4EAYV3k4czIZXn4gnANaS/jmpDV8gFVKOgeII5ce+h,iv:6/YnhiTdR1sVeR2QhNPmI/q1jb3oSQDODTqxAbDlESo=,tag:7SNZU0pbTt5QweGzKh3XoA==,type:str]",
"password": "ENC[AES256_GCM,data:rA7aewwH4umPWAnzuOen8oYgLwvQsf19rfGOWARbGfW5lXSuxx4F4g==,iv:KRq/lxZ28JfUkGeBlDvmMfpVLVbPlEmOD8l/aRfG5Zo=,tag:pH0oAueSsrXaXcuXM4LlEQ==,type:str]"
},
"frpc": {
"addr": "ENC[AES256_GCM,data:mke8TeqY6AWnbiWRQa6D0w==,iv:O8tnERYeDK2ACeOzlEiHnhty9ILYoi8cCSa80ngDHg4=,tag:zA/zMc0DIATseyF5BUVymQ==,type:str]",
"port": "ENC[AES256_GCM,data:d1sh9w==,iv:bmzN6duVQylgRXAchkPavtUsdbNdd+S3f7N73ukFV2A=,tag:4nFCulM5yYdMvTY2CwiJ0g==,type:float]",
"token": "ENC[AES256_GCM,data:qe9Hy0tKKrQ7rFzOQuTlDKGDqA2489hvEbjbtYFMuJ3wd5gy,iv:gV+OasDEHtI28TVBFmyxrPjMqEv5J1jpWMAzR1N70MQ=,tag:543oFV21kJj9qBgkyewdIg==,type:str]"
},
"sops": {
"kms": null,
"gcp_kms": null,
Expand All @@ -18,8 +23,8 @@
],
"hc_vault": null,
"age": null,
"lastmodified": "2023-10-28T02:50:38Z",
"mac": "ENC[AES256_GCM,data:16VSdcgdGXVxl6xeCv/2LmwOnHyJE8Y9sOEmY2y/c/CpAgNVLF+yBtQnBSQyMlVUskuBEFQxpcm1TQXjz11ngtlRIaITqJGcPDQoIdQMUvamP8Ku69pxrc0UPZOR5qv5BkAoZapCxH5xWWQPakgaHf74NES4Bn2FUe8tR6T50JM=,iv:gSTP1rTGhui1yuCsREeNR9JFcXfVyGfLcTTInAu27qs=,tag:0rv8y1y4qvTkL6DtH50KEw==,type:str]",
"lastmodified": "2023-10-31T18:08:10Z",
"mac": "ENC[AES256_GCM,data:OtzdhIFbPxagE61i0gA871aw8Pkl/PZrwigtX2S3tisOviuYwK0h1QNZN+eKTDXyEbs7fht4ruo4m6EEmZS2nwvgquAUaoA9A9yXARfYFRPQ3D5a21WH8nlN0JMBGoD70VwGjw22Yy2XfG5FbaOnVN2eQ5QpSHaDw1umEiR0rWE=,iv:YZIof2lisX96LDntHnsyDKzQjtlel5SM+Tvpq4C9Sn0=,tag:gEGFEGNB1Fvf1VMKLokMYg==,type:str]",
"pgp": null,
"unencrypted_suffix": "_unencrypted",
"version": "3.8.1"
Expand Down

0 comments on commit 47ccc47

Please sign in to comment.