Skip to content

Commit

Permalink
feat: add redis syncer for webhook
Browse files Browse the repository at this point in the history
  • Loading branch information
pulak-opti committed Nov 17, 2023
1 parent 9825f2d commit cbe60d2
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 16 deletions.
14 changes: 14 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM golang:1.21 as builder
RUN addgroup -u 1000 agentgroup &&\
useradd -u 1000 agentuser -g agentgroup
WORKDIR /go/src/github.com/optimizely/agent
COPY . .
RUN make setup build &&\
make ci_build_static_binary

FROM scratch
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=builder /go/src/github.com/optimizely/agent/bin/optimizely /optimizely
COPY --from=builder /etc/passwd /etc/passwd
USER agentuser
CMD ["/optimizely"]
5 changes: 3 additions & 2 deletions cmd/optimizely/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ func main() {
sdkMetricsRegistry := optimizely.NewRegistry(agentMetricsRegistry)

ctx, cancel := context.WithCancel(context.Background()) // Create default service context
sg := server.NewGroup(ctx, conf.Server) // Create a new server group to manage the individual http listeners
defer cancel()
sg := server.NewGroup(ctx, conf.Server) // Create a new server group to manage the individual http listeners
optlyCache := optimizely.NewCache(ctx, *conf, sdkMetricsRegistry)
optlyCache.Init(conf.SDKKeys)

Expand All @@ -286,7 +287,7 @@ func main() {

log.Info().Str("version", conf.Version).Msg("Starting services.")
sg.GoListenAndServe("api", conf.API.Port, apiRouter)
sg.GoListenAndServe("webhook", conf.Webhook.Port, routers.NewWebhookRouter(optlyCache, conf.Webhook))
sg.GoListenAndServe("webhook", conf.Webhook.Port, routers.NewWebhookRouter(ctx, optlyCache, *conf))
sg.GoListenAndServe("admin", conf.Admin.Port, adminRouter) // Admin should be added last.

// wait for server group to shutdown
Expand Down
17 changes: 10 additions & 7 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,17 @@ webhook:
## http listener port
port: "8089"
# ## a map of Optimizely Projects to one or more SDK keys
# projects:
# ## <project-id>: Optimizely project id as an integer
# <project-id>:
# ## sdkKeys: a list of SDKs linked to this project
# sdkKeys:
# - <sdk-key-1>
projects:
## <project-id>: Optimizely project id as an integer
23897262460:
## sdkKeys: a list of SDKs linked to this project
sdkKeys:
- RiowyaMnbPxLa4dPWrDqu
# - <sdk-key-1>
# ## secret: webhook secret used the validate the notification
# secret: <secret-10000>
# ## skipSignatureCheck: override the signature check (not recommended for production)
# skipSignatureCheck: true
skipSignatureCheck: true

##
## optimizely client configurations (options passed to the underlying go-sdk)
Expand Down Expand Up @@ -257,3 +257,6 @@ synchronization:
notification:
enable: false
default: "redis"
datafile:
enable: true
default: "redis"
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ type AgentConfig struct {
type SyncConfig struct {
Pubsub map[string]interface{} `json:"pubsub"`
Notification NotificationConfig `json:"notification"`
Datafile DatafileSyncConfig `json:"datafile"`
}

// NotificationConfig contains Notification Synchronization configuration for the multiple Agent nodes
Expand All @@ -176,6 +177,12 @@ type NotificationConfig struct {
Default string `json:"default"`
}

// DatafileSyncConfig contains Datafile Synchronization configuration for the multiple Agent nodes
type DatafileSyncConfig struct {
Enable bool `json:"enable"`
Default string `json:"default"`
}

// HTTPSDisabledWarning is logged when keyfile and certfile are not provided in server configuration
var HTTPSDisabledWarning = "keyfile and certfile not available, so server will use HTTP. For production deployments, it is recommended to either set keyfile and certfile for HTTPS, or run Agent behind a load balancer/reverse proxy that uses HTTPS."

Expand Down
86 changes: 85 additions & 1 deletion pkg/handlers/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,26 @@
package handlers

import (
"context"
"crypto/hmac"
"crypto/sha1"
"crypto/subtle"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"

"github.com/go-redis/redis/v8"
"github.com/optimizely/agent/config"

"github.com/go-chi/render"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"github.com/optimizely/agent/pkg/optimizely"
"github.com/optimizely/agent/pkg/syncer"
)

const signatureHeader = "X-Hub-Signature"
Expand All @@ -58,13 +63,15 @@ type OptlyMessage struct {
type OptlyWebhookHandler struct {
optlyCache optimizely.Cache
ProjectMap map[int64]config.WebhookProject
syncConfig config.SyncConfig
}

// NewWebhookHandler returns a new instance of OptlyWebhookHandler
func NewWebhookHandler(optlyCache optimizely.Cache, projectMap map[int64]config.WebhookProject) *OptlyWebhookHandler {
func NewWebhookHandler(optlyCache optimizely.Cache, projectMap map[int64]config.WebhookProject, conf config.SyncConfig) *OptlyWebhookHandler {
return &OptlyWebhookHandler{
optlyCache: optlyCache,
ProjectMap: projectMap,
syncConfig: conf,
}
}

Expand Down Expand Up @@ -140,7 +147,84 @@ func (h *OptlyWebhookHandler) HandleWebhook(w http.ResponseWriter, r *http.Reque

// Iterate through all SDK keys and update config
for _, sdkKey := range webhookConfig.SDKKeys {
fmt.Println("=========== updating config =============")
h.optlyCache.UpdateConfigs(sdkKey)
}

if h.syncConfig.Datafile.Enable {
log.Info().Msg("======================= Syncing datafile ============================")
for _, sdkKey := range webhookConfig.SDKKeys {
log.Info().Msg("====================== sdk key ============================")
log.Info().Msg(sdkKey)
syncer, err := syncer.NewRedisSyncer(&zerolog.Logger{}, h.syncConfig, sdkKey)
if err != nil {
errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error())
log.Error().Msg(errMsg)
render.Status(r, http.StatusInternalServerError)
render.JSON(w, r, render.M{
"error": errMsg,
})
return
}

if err := syncer.SyncConfig(sdkKey); err != nil {
errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error())
log.Error().Msg(errMsg)
render.Status(r, http.StatusInternalServerError)
render.JSON(w, r, render.M{
"error": errMsg,
})
return
}
}
}
w.WriteHeader(http.StatusNoContent)
}

func (h *OptlyWebhookHandler) StartSyncer(ctx context.Context) error {
fmt.Println("================ starting syncer ===================")
redisSyncer, err := syncer.NewRedisSyncer(&zerolog.Logger{}, h.syncConfig, "")
if err != nil {
return err
}

client := redis.NewClient(&redis.Options{
Addr: redisSyncer.Host,
Password: redisSyncer.Password,
DB: redisSyncer.Database,
})

// Subscribe to a Redis channel
pubsub := client.Subscribe(ctx, syncer.GetDatafileSyncChannel())

logger, ok := ctx.Value(LoggerKey).(*zerolog.Logger)
if !ok {
logger = &zerolog.Logger{}
}

go func() {
for {
select {
case <-ctx.Done():
pubsub.Close()
client.Close()
logger.Debug().Msg("context canceled, redis notification receiver is closed")
return
default:
// fmt.Println("====================== waiting for message ============================")
msg, err := pubsub.ReceiveMessage(ctx)
if err != nil {
logger.Err(err).Msg("failed to receive message from redis")
continue
}

fmt.Println("===================== message from redis: ", msg.Payload, "=========================")
logger.Info().Msg("received message from redis")
logger.Info().Msg(msg.Payload)

h.optlyCache.UpdateConfigs(msg.Payload)
}
}
}()
return nil
}
12 changes: 10 additions & 2 deletions pkg/routers/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package routers

import (
"context"
"fmt"

"github.com/optimizely/agent/config"
"github.com/optimizely/agent/pkg/handlers"

Expand All @@ -29,12 +32,17 @@ import (
)

// NewWebhookRouter returns HTTP API router
func NewWebhookRouter(optlyCache optimizely.Cache, conf config.WebhookConfig) *chi.Mux {
func NewWebhookRouter(ctx context.Context, optlyCache optimizely.Cache, conf config.AgentConfig) *chi.Mux {
r := chi.NewRouter()

r.Use(chimw.AllowContentType("application/json"))
r.Use(render.SetContentType(render.ContentTypeJSON))
webhookAPI := handlers.NewWebhookHandler(optlyCache, conf.Projects)
webhookAPI := handlers.NewWebhookHandler(optlyCache, conf.Webhook.Projects, conf.Synchronization)
if conf.Synchronization.Datafile.Enable {
if err := webhookAPI.StartSyncer(ctx); err != nil {
fmt.Errorf("failed to start datafile syncer: %s", err.Error())
}
}

r.Post("/webhooks/optimizely", webhookAPI.HandleWebhook)
return r
Expand Down
26 changes: 22 additions & 4 deletions pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,10 @@ func NewRedisSyncer(logger *zerolog.Logger, conf config.SyncConfig, sdkKey strin
mutexLock.Lock()
defer mutexLock.Unlock()

if nc, found := ncCache[sdkKey]; found {
if nc, found := ncCache[sdkKey]; found && sdkKey != "" {
return nc, nil
}

if !conf.Notification.Enable {
return nil, errors.New("notification syncer is not enabled")
}
if conf.Notification.Default != PubSubRedis {
return nil, errors.New("redis syncer is not set as default")
}
Expand Down Expand Up @@ -159,6 +156,27 @@ func (r *RedisSyncer) Send(t notification.Type, n interface{}) error {
return nil
}

func (r *RedisSyncer) SyncConfig(sdkKey string) error {
client := redis.NewClient(&redis.Options{
Addr: r.Host,
Password: r.Password,
DB: r.Database,
})
defer client.Close()
channel := GetDatafileSyncChannel()

if err := client.Publish(r.ctx, channel, sdkKey).Err(); err != nil {
r.logger.Err(err).Msg("failed to publish datafile sync event to pub/sub")
return err
}
fmt.Println("====================== published message ============================")
return nil
}

func GetDatafileSyncChannel() string {
return fmt.Sprintf("%s-datafile", PubSubDefaultChan)
}

func GetChannelForSDKKey(channel, key string) string {
return fmt.Sprintf("%s-%s", channel, key)
}

0 comments on commit cbe60d2

Please sign in to comment.