Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
pulak-opti committed Nov 22, 2023
1 parent dfb0f01 commit 5e56fb2
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pkg/handlers/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func RedisNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc
return nil, err
}

eventCh, err := redisSyncer.Subscribe(ctx, syncer.GetChannelForSDKKey("opti", sdkKey))
eventCh, err := redisSyncer.Subscribe(ctx, syncer.GetChannelForSDKKey(syncer.PubSubDefaultChan, sdkKey))
if err != nil {
return nil, err
}
Expand Down
29 changes: 26 additions & 3 deletions pkg/syncer/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,33 @@ type PubSub interface {

func NewPubSub(conf config.SyncConfig) (PubSub, error) {
if conf.Notification.Default == PubSubRedis {
host, ok := conf.Pubsub[PubSubRedis].(map[string]interface{})["host"].(string)
if !ok {
return nil, errors.New("host is not valid")
}
password, ok := conf.Pubsub[PubSubRedis].(map[string]interface{})["password"].(string)
if !ok {
return nil, errors.New("password is not valid")
}
database, ok := conf.Pubsub[PubSubRedis].(map[string]interface{})["database"].(int)
if !ok {
return nil, errors.New("database is not valid")
}

client := redis.NewClient(&redis.Options{
Addr: host,
Password: password,
DB: database,
})
defer client.Close()
if err := client.Ping(context.Background()).Err(); err != nil {
return nil, err
}

return &pubsubRedis{
host: conf.Pubsub[PubSubRedis].(map[string]interface{})["host"].(string),
password: conf.Pubsub[PubSubRedis].(map[string]interface{})["password"].(string),
database: conf.Pubsub[PubSubRedis].(map[string]interface{})["database"].(int),
host: host,
password: password,
database: database,
}, nil
}
return nil, errors.New("pubsub type not supported")
Expand Down
4 changes: 4 additions & 0 deletions pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Event struct {
}

func NewSyncedNotificationCenter(ctx context.Context, logger *zerolog.Logger, sdkKey string, conf config.SyncConfig) (NotificationSyncer, error) {
mutexLock.Lock()
defer mutexLock.Unlock()

if nc, ok := ncCache[sdkKey]; ok {
return nc, nil
}
Expand All @@ -63,6 +66,7 @@ func NewSyncedNotificationCenter(ctx context.Context, logger *zerolog.Logger, sd
}

nc := &SyncedNotificationCenter{
ctx: ctx,
logger: logger,
sdkKey: sdkKey,
pubsub: pubsub,
Expand Down

0 comments on commit 5e56fb2

Please sign in to comment.