Skip to content

Commit

Permalink
Modify syncer
Browse files Browse the repository at this point in the history
  • Loading branch information
pulak-opti committed Nov 22, 2023
1 parent cbe60d2 commit dfb0f01
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 165 deletions.
16 changes: 5 additions & 11 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func NewDefaultConfig() *AgentConfig {
"channel": "optimizely-notifications",
},
},
Notification: NotificationConfig{
Notification: FeatureSyncConfig{
Enable: false,
Default: "redis",
},
Expand Down Expand Up @@ -167,18 +167,12 @@ type AgentConfig struct {
// SyncConfig contains Synchronization configuration for the multiple Agent nodes
type SyncConfig struct {
Pubsub map[string]interface{} `json:"pubsub"`
Notification NotificationConfig `json:"notification"`
Datafile DatafileSyncConfig `json:"datafile"`
Notification FeatureSyncConfig `json:"notification"`
Datafile FeatureSyncConfig `json:"datafile"`
}

// NotificationConfig contains Notification Synchronization configuration for the multiple Agent nodes
type NotificationConfig struct {
Enable bool `json:"enable"`
Default string `json:"default"`
}

// DatafileSyncConfig contains Datafile Synchronization configuration for the multiple Agent nodes
type DatafileSyncConfig struct {
// FeatureSyncConfig contains Notification Synchronization configuration for the multiple Agent nodes
type FeatureSyncConfig struct {
Enable bool `json:"enable"`
Default string `json:"default"`
}
Expand Down
28 changes: 8 additions & 20 deletions pkg/handlers/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"net/http"
"strings"

"github.com/go-redis/redis/v8"
"github.com/optimizely/agent/config"
"github.com/optimizely/agent/pkg/middleware"
"github.com/optimizely/agent/pkg/syncer"
Expand Down Expand Up @@ -219,19 +218,15 @@ func RedisNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc
return nil, errors.New("sdk key not found")
}

redisSyncer, err := syncer.NewRedisSyncer(&zerolog.Logger{}, conf, sdkKey)
redisSyncer, err := syncer.NewSyncedNotificationCenter(ctx, &zerolog.Logger{}, sdkKey, conf)
if err != nil {
return nil, err
}

client := redis.NewClient(&redis.Options{
Addr: redisSyncer.Host,
Password: redisSyncer.Password,
DB: redisSyncer.Database,
})

// Subscribe to a Redis channel
pubsub := client.Subscribe(ctx, syncer.GetChannelForSDKKey(redisSyncer.Channel, sdkKey))
eventCh, err := redisSyncer.Subscribe(ctx, syncer.GetChannelForSDKKey("opti", sdkKey))
if err != nil {
return nil, err
}

dataChan := make(chan syncer.Event)

Expand All @@ -244,19 +239,12 @@ func RedisNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc
for {
select {
case <-ctx.Done():
client.Close()
pubsub.Close()
close(dataChan)
logger.Debug().Msg("context canceled, redis notification receiver is closed")
return
default:
msg, err := pubsub.ReceiveMessage(ctx)
if err != nil {
logger.Err(err).Msg("failed to receive message from redis")
continue
}

case msg := <-eventCh:
var event syncer.Event
if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
if err := json.Unmarshal([]byte(msg), &event); err != nil {
logger.Err(err).Msg("failed to unmarshal redis message")
continue
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/handlers/notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (suite *NotificationTestSuite) TestTrackAndProjectConfigWithSynchronization
"database": 0,
},
},
Notification: config.NotificationConfig{
Notification: config.FeatureSyncConfig{
Enable: true,
Default: "redis",
},
Expand Down Expand Up @@ -370,7 +370,7 @@ func TestRedisNotificationReceiver(t *testing.T) {
"database": 0,
},
},
Notification: config.NotificationConfig{
Notification: config.FeatureSyncConfig{
Enable: true,
Default: "redis",
},
Expand Down
40 changes: 12 additions & 28 deletions pkg/handlers/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"net/http"
"strconv"

"github.com/go-redis/redis/v8"
"github.com/optimizely/agent/config"

"github.com/go-chi/render"
Expand Down Expand Up @@ -156,7 +155,7 @@ func (h *OptlyWebhookHandler) HandleWebhook(w http.ResponseWriter, r *http.Reque
for _, sdkKey := range webhookConfig.SDKKeys {
log.Info().Msg("====================== sdk key ============================")
log.Info().Msg(sdkKey)
syncer, err := syncer.NewRedisSyncer(&zerolog.Logger{}, h.syncConfig, sdkKey)
dfSyncer, err := syncer.NewDatafileSyncer(h.syncConfig)
if err != nil {
errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error())
log.Error().Msg(errMsg)
Expand All @@ -167,7 +166,7 @@ func (h *OptlyWebhookHandler) HandleWebhook(w http.ResponseWriter, r *http.Reque
return
}

if err := syncer.SyncConfig(sdkKey); err != nil {
if err := dfSyncer.Sync(r.Context(), syncer.GetDatafileSyncChannel(), sdkKey); err != nil {
errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error())
log.Error().Msg(errMsg)
render.Status(r, http.StatusInternalServerError)
Expand All @@ -183,46 +182,31 @@ func (h *OptlyWebhookHandler) HandleWebhook(w http.ResponseWriter, r *http.Reque

func (h *OptlyWebhookHandler) StartSyncer(ctx context.Context) error {
fmt.Println("================ starting syncer ===================")
redisSyncer, err := syncer.NewRedisSyncer(&zerolog.Logger{}, h.syncConfig, "")
dfSyncer, err := syncer.NewDatafileSyncer(h.syncConfig)
if err != nil {
return err
}

client := redis.NewClient(&redis.Options{
Addr: redisSyncer.Host,
Password: redisSyncer.Password,
DB: redisSyncer.Database,
})

// Subscribe to a Redis channel
pubsub := client.Subscribe(ctx, syncer.GetDatafileSyncChannel())

logger, ok := ctx.Value(LoggerKey).(*zerolog.Logger)
if !ok {
logger = &zerolog.Logger{}
}

dataCh, err := dfSyncer.Subscribe(ctx, syncer.GetDatafileSyncChannel())
if err != nil {
return err
}

go func() {
for {
select {
case <-ctx.Done():
pubsub.Close()
client.Close()
logger.Debug().Msg("context canceled, redis notification receiver is closed")
return
default:
// fmt.Println("====================== waiting for message ============================")
msg, err := pubsub.ReceiveMessage(ctx)
if err != nil {
logger.Err(err).Msg("failed to receive message from redis")
continue
}

fmt.Println("===================== message from redis: ", msg.Payload, "=========================")
logger.Info().Msg("received message from redis")
logger.Info().Msg(msg.Payload)

h.optlyCache.UpdateConfigs(msg.Payload)
case key := <-dataCh:
fmt.Println("=========== updating config =============")
fmt.Println("for key: ", key)
h.optlyCache.UpdateConfigs(key)
}
}
}()
Expand Down
4 changes: 2 additions & 2 deletions pkg/optimizely/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,11 @@ func defaultLoader(
}

if agentConf.Synchronization.Notification.Enable {
redisSyncer, err := syncer.NewRedisSyncer(&zerolog.Logger{}, agentConf.Synchronization, sdkKey)
syncedNC, err := syncer.NewSyncedNotificationCenter(context.Background(), &zerolog.Logger{}, sdkKey, agentConf.Synchronization)
if err != nil {
return nil, err
}
clientOptions = append(clientOptions, client.WithNotificationCenter(redisSyncer))
clientOptions = append(clientOptions, client.WithNotificationCenter(syncedNC))
}

var clientUserProfileService decision.UserProfileService
Expand Down
100 changes: 100 additions & 0 deletions pkg/syncer/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/****************************************************************************
* Copyright 2023 Optimizely, Inc. and contributors *
* *
* Licensed under the Apache License, Version 2.0 (the "License"); *
* you may not use this file except in compliance with the License. *
* You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, software *
* distributed under the License is distributed on an "AS IS" BASIS, *
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
* See the License for the specific language governing permissions and *
* limitations under the License. *
***************************************************************************/

// Package syncer provides synchronization across Agent nodes
package syncer

import (
"context"
"errors"

"github.com/go-redis/redis/v8"
"github.com/optimizely/agent/config"
)

const (
// PubSubDefaultChan will be used as default pubsub channel name
PubSubDefaultChan = "optimizely-sync"
// PubSubRedis is the name of pubsub type of Redis
PubSubRedis = "redis"
)

type PubSub interface {
Publish(ctx context.Context, channel string, message interface{}) error
Subscribe(ctx context.Context, channel string) (chan string, error)
}

func NewPubSub(conf config.SyncConfig) (PubSub, error) {
if conf.Notification.Default == PubSubRedis {
return &pubsubRedis{
host: conf.Pubsub[PubSubRedis].(map[string]interface{})["host"].(string),
password: conf.Pubsub[PubSubRedis].(map[string]interface{})["password"].(string),
database: conf.Pubsub[PubSubRedis].(map[string]interface{})["database"].(int),
}, nil
}
return nil, errors.New("pubsub type not supported")
}

type pubsubRedis struct {
host string
password string
database int
}

func (r *pubsubRedis) Publish(ctx context.Context, channel string, message interface{}) error {
client := redis.NewClient(&redis.Options{
Addr: r.host,
Password: r.password,
DB: r.database,
})
defer client.Close()

return client.Publish(ctx, channel, message).Err()
}

func (r *pubsubRedis) Subscribe(ctx context.Context, channel string) (chan string, error) {
client := redis.NewClient(&redis.Options{
Addr: r.host,
Password: r.password,
DB: r.database,
})

// Subscribe to a Redis channel
pubsub := client.Subscribe(ctx, channel)

ch := make(chan string)

go func() {
for {
select {
case <-ctx.Done():
pubsub.Close()
client.Close()
close(ch)
return
default:
msg, err := pubsub.ReceiveMessage(ctx)
if err != nil {
continue
}

ch <- msg.Payload

}
}
}()
return ch, nil
}
Loading

0 comments on commit dfb0f01

Please sign in to comment.