Skip to content

Commit

Permalink
Merge pull request TheThingsNetwork#2231 from TheThingsIndustries/fea…
Browse files Browse the repository at this point in the history
…ture/redis-read-replicas
  • Loading branch information
johanstokking authored Jul 7, 2020
2 parents 2a8bf94 + 80314b5 commit cd0d0b2
Show file tree
Hide file tree
Showing 13 changed files with 76 additions and 19 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Support for Redis read replicas, reducing the load on the read-write Redis master.

### Changed

### Deprecated
Expand Down Expand Up @@ -146,7 +148,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Missing target ns and as in parameters in device claiming sdk claim request
- Missing target ns and as in parameters in device claiming sdk claim request
- Handling of device unsetting the ADR bit in uplink, after ADR has been started.
- Invalid `oauth-server-address` in CLI config generated by `use` command when config file is already present.
- Network Server now properly handles FPort 0 data uplinks carrying FOpts.
Expand Down
14 changes: 14 additions & 0 deletions doc/content/reference/configuration/the-things-stack.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ Or you can enable failover using [Redis Sentinel](https://redis.io/topics/sentin
- `redis.failover.addresses`: List of addresses of the Redis Sentinel instances (required)
- `redis.failover.master-name`: Redis Sentinel master name (required)

Further, you can reduce the load on the Redis master by specifying read-only configuration:

- `redis.readonly.address`: Address of the Redis server
- `redis.readonly.password`: Password of the Redis server
- `redis.readonly.database`: Redis database to use
- `redis.readonly.pool-size`: The maximum size of the connection pool

## Blob Options

The `blob` options configure how {{% tts %}} reads or writes files such as pictures, the frequency plans repository or files required for Backend Interfaces interoperability. The `provider` field selects the provider that is used, and which other options are read.
Expand Down Expand Up @@ -181,6 +188,13 @@ When using the `redis` backend, the global [Redis configuration]({{< ref "#redis
- `events.redis.namespace`: Namespace for Redis keys
- `events.redis.pool-size`: The maximum size of the connection pool

Similar to the global Redis configuration, you can reduce the load on the Redis master by specifying read-only configuration:

- `events.redis.readonly.address`: Address of the Redis server
- `events.redis.readonly.password`: Password of the Redis server
- `events.redis.readonly.database`: Redis database to use
- `events.redis.readonly.pool-size`: The maximum size of the connection pool

With the `cloud` backend, the configured publish and subscribe URLs are passed to [the Go CDK](https://gocloud.dev/howto/pubsub/).

- `events.cloud.publish-url`: URL for the topic to send events
Expand Down
2 changes: 1 addition & 1 deletion pkg/applicationserver/io/packages/redis/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (r *ApplicationPackagesRegistry) makeAssociationKeyFunc(devUID string) func
func (r ApplicationPackagesRegistry) Get(ctx context.Context, ids ttnpb.ApplicationPackageAssociationIdentifiers, paths []string) (*ttnpb.ApplicationPackageAssociation, error) {
pb := &ttnpb.ApplicationPackageAssociation{}
defer trace.StartRegion(ctx, "get application package association by id").End()
if err := ttnredis.GetProto(r.Redis, r.associationKey(unique.ID(ctx, ids.EndDeviceIdentifiers), r.fPortStr(ids.FPort))).ScanProto(pb); err != nil {
if err := ttnredis.GetProto(r.Redis.ReadOnlyClient(), r.associationKey(unique.ID(ctx, ids.EndDeviceIdentifiers), r.fPortStr(ids.FPort))).ScanProto(pb); err != nil {
return nil, err
}
return applyAssociationFieldMask(nil, pb, appendImplicitAssociationGetPaths(paths...)...)
Expand Down
4 changes: 2 additions & 2 deletions pkg/applicationserver/io/pubsub/redis/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (r *PubSubRegistry) makeUIDKeyFunc(appUID string) func(id string) string {
// Get implements pubsub.Registry.
func (r PubSubRegistry) Get(ctx context.Context, ids ttnpb.ApplicationPubSubIdentifiers, paths []string) (*ttnpb.ApplicationPubSub, error) {
pb := &ttnpb.ApplicationPubSub{}
if err := ttnredis.GetProto(r.Redis, r.uidKey(unique.ID(ctx, ids.ApplicationIdentifiers), ids.PubSubID)).ScanProto(pb); err != nil {
if err := ttnredis.GetProto(r.Redis.ReadOnlyClient(), r.uidKey(unique.ID(ctx, ids.ApplicationIdentifiers), ids.PubSubID)).ScanProto(pb); err != nil {
return nil, err
}
return applyPubSubFieldMask(nil, pb, appendImplicitPubSubGetPaths(paths...)...)
Expand All @@ -101,7 +101,7 @@ func (r PubSubRegistry) Range(ctx context.Context, paths []string, f func(contex
return errApplicationUID.WithCause(err).WithAttributes("application_uid", appUID, "pub_sub_id", psID)
}
pb := &ttnpb.ApplicationPubSub{}
if err := ttnredis.GetProto(r.Redis, r.uidKey(appUID, psID)).ScanProto(pb); err != nil {
if err := ttnredis.GetProto(r.Redis.ReadOnlyClient(), r.uidKey(appUID, psID)).ScanProto(pb); err != nil {
return err
}
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/applicationserver/io/web/redis/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (r *WebhookRegistry) makeIDKeyFunc(appUID string) func(id string) string {
// Get implements WebhookRegistry.
func (r WebhookRegistry) Get(ctx context.Context, ids ttnpb.ApplicationWebhookIdentifiers, paths []string) (*ttnpb.ApplicationWebhook, error) {
pb := &ttnpb.ApplicationWebhook{}
if err := ttnredis.GetProto(r.Redis, r.idKey(unique.ID(ctx, ids.ApplicationIdentifiers), ids.WebhookID)).ScanProto(pb); err != nil {
if err := ttnredis.GetProto(r.Redis.ReadOnlyClient(), r.idKey(unique.ID(ctx, ids.ApplicationIdentifiers), ids.WebhookID)).ScanProto(pb); err != nil {
return nil, err
}
return applyWebhookFieldMask(nil, pb, appendImplicitWebhookGetPaths(paths...)...)
Expand All @@ -80,7 +80,7 @@ func (r WebhookRegistry) Get(ctx context.Context, ids ttnpb.ApplicationWebhookId
func (r WebhookRegistry) List(ctx context.Context, ids ttnpb.ApplicationIdentifiers, paths []string) ([]*ttnpb.ApplicationWebhook, error) {
var pbs []*ttnpb.ApplicationWebhook
appUID := unique.ID(ctx, ids)
err := ttnredis.FindProtos(r.Redis, r.appKey(appUID), r.makeIDKeyFunc(appUID)).Range(func() (proto.Message, func() (bool, error)) {
err := ttnredis.FindProtos(r.Redis.ReadOnlyClient(), r.appKey(appUID), r.makeIDKeyFunc(appUID)).Range(func() (proto.Message, func() (bool, error)) {
pb := &ttnpb.ApplicationWebhook{}
return pb, func() (bool, error) {
pb, err := applyWebhookFieldMask(nil, pb, appendImplicitWebhookGetPaths(paths...)...)
Expand Down
8 changes: 4 additions & 4 deletions pkg/applicationserver/redis/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (r *DeviceRegistry) Get(ctx context.Context, ids ttnpb.EndDeviceIdentifiers
defer trace.StartRegion(ctx, "get end device").End()

pb := &ttnpb.EndDevice{}
if err := ttnredis.GetProto(r.Redis, r.uidKey(unique.ID(ctx, ids))).ScanProto(pb); err != nil {
if err := ttnredis.GetProto(r.Redis.ReadOnlyClient(), r.uidKey(unique.ID(ctx, ids))).ScanProto(pb); err != nil {
return nil, err
}
return ttnpb.FilterGetEndDevice(pb, paths...)
Expand Down Expand Up @@ -251,7 +251,7 @@ func (r *LinkRegistry) Get(ctx context.Context, ids ttnpb.ApplicationIdentifiers
defer trace.StartRegion(ctx, "get link").End()

pb := &ttnpb.ApplicationLink{}
if err := ttnredis.GetProto(r.Redis, r.appKey(unique.ID(ctx, ids))).ScanProto(pb); err != nil {
if err := ttnredis.GetProto(r.Redis.ReadOnlyClient(), r.appKey(unique.ID(ctx, ids))).ScanProto(pb); err != nil {
return nil, err
}
return applyLinkFieldMask(nil, pb, paths...)
Expand All @@ -263,7 +263,7 @@ var errApplicationUID = errors.DefineCorruption("application_uid", "invalid appl
func (r *LinkRegistry) Range(ctx context.Context, paths []string, f func(context.Context, ttnpb.ApplicationIdentifiers, *ttnpb.ApplicationLink) bool) error {
defer trace.StartRegion(ctx, "range links").End()

uids, err := r.Redis.SMembers(r.allKey(ctx)).Result()
uids, err := r.Redis.ReadOnlyClient().SMembers(r.allKey(ctx)).Result()
if err != nil {
return ttnredis.ConvertError(err)
}
Expand All @@ -277,7 +277,7 @@ func (r *LinkRegistry) Range(ctx context.Context, paths []string, f func(context
return errApplicationUID.WithCause(err).WithAttributes("application_uid", uid)
}
pb := &ttnpb.ApplicationLink{}
if err := ttnredis.GetProto(r.Redis, r.appKey(uid)).ScanProto(pb); err != nil {
if err := ttnredis.GetProto(r.Redis.ReadOnlyClient(), r.appKey(uid)).ScanProto(pb); err != nil {
return err
}
pb, err = applyLinkFieldMask(nil, pb, paths...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/claims_redis.tti.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (r *RedisClaimRegistry) Unclaim(ctx context.Context, ids ttnpb.Identifiers)
// GetPeerID looks up which from the given candidates has a claim on the given identifiers.
func (r *RedisClaimRegistry) GetPeerID(ctx context.Context, ids ttnpb.Identifiers, candidateIDs ...string) (string, error) {
results := make([]*redis.BoolCmd, len(candidateIDs))
_, err := r.Redis.Pipelined(func(tx redis.Pipeliner) error {
_, err := r.Redis.ReadOnlyClient().Pipelined(func(tx redis.Pipeliner) error {
for i, candidateID := range candidateIDs {
if ids, ok := ids.Identifiers().(*ttnpb.GatewayIdentifiers); ok && ids.EUI != nil {
results[i] = tx.SIsMember(
Expand Down
6 changes: 5 additions & 1 deletion pkg/events/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func WrapPubSub(wrapped events.PubSub, conf ttnredis.Config) (ps *PubSub) {
client: ttnRedisClient.Client,
eventChannel: ttnRedisClient.Key("events"),
closeWait: make(chan struct{}),

readClient: ttnRedisClient.ReadOnlyClient().Client,
}
return
}
Expand All @@ -50,12 +52,14 @@ type PubSub struct {
subOnce sync.Once
sub *redis.PubSub
closeWait chan struct{}

readClient *redis.Client
}

// Subscribe implements the events.Subscriber interface.
func (ps *PubSub) Subscribe(name string, hdl events.Handler) error {
ps.subOnce.Do(func() {
ps.sub = ps.client.Subscribe(ps.eventChannel)
ps.sub = ps.readClient.Subscribe(ps.eventChannel)
go func() {
defer close(ps.closeWait)
for {
Expand Down
2 changes: 1 addition & 1 deletion pkg/gatewayserver/redis/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (r *GatewayConnectionStatsRegistry) Get(ctx context.Context, ids ttnpb.Gate
result := &ttnpb.GatewayConnectionStats{}
stats := &ttnpb.GatewayConnectionStats{}

retrieved, err := r.Redis.MGet(r.key(upKey, uid), r.key(downKey, uid), r.key(statusKey, uid)).Result()
retrieved, err := r.Redis.ReadOnlyClient().MGet(r.key(upKey, uid), r.key(downKey, uid), r.key(statusKey, uid)).Result()
if err != nil {
return nil, ttnredis.ConvertError(err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/joinserver/redis/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *DeviceRegistry) GetByID(ctx context.Context, appID ttnpb.ApplicationIde
defer trace.StartRegion(ctx, "get end device by id").End()

pb := &ttnpb.EndDevice{}
if err := ttnredis.GetProto(r.Redis, r.uidKey(unique.ID(ctx, ids))).ScanProto(pb); err != nil {
if err := ttnredis.GetProto(r.Redis.ReadOnlyClient(), r.uidKey(unique.ID(ctx, ids))).ScanProto(pb); err != nil {
return nil, err
}
return ttnpb.FilterGetEndDevice(pb, paths...)
Expand All @@ -97,7 +97,7 @@ func (r *DeviceRegistry) GetByEUI(ctx context.Context, joinEUI, devEUI types.EUI
defer trace.StartRegion(ctx, "get end device by eui").End()

pb := &ttnpb.EndDevice{}
if err := ttnredis.FindProto(r.Redis, r.euiKey(joinEUI, devEUI), func(uid string) (string, error) {
if err := ttnredis.FindProto(r.Redis.ReadOnlyClient(), r.euiKey(joinEUI, devEUI), func(uid string) (string, error) {
tntID, err := unique.ToTenantID(uid)
if err != nil {
return "", err
Expand Down Expand Up @@ -394,7 +394,7 @@ func (r *KeyRegistry) GetByID(ctx context.Context, joinEUI, devEUI types.EUI64,
defer trace.StartRegion(ctx, "get session keys").End()

pb := &ttnpb.SessionKeys{}
if err := ttnredis.GetProto(r.Redis, r.idKey(joinEUI, devEUI, id)).ScanProto(pb); err != nil {
if err := ttnredis.GetProto(r.Redis.ReadOnlyClient(), r.idKey(joinEUI, devEUI, id)).ScanProto(pb); err != nil {
return nil, err
}
return ttnpb.FilterGetSessionKeys(pb, paths...)
Expand Down
4 changes: 2 additions & 2 deletions pkg/networkserver/redis/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (r *DeviceRegistry) GetByEUI(ctx context.Context, joinEUI, devEUI types.EUI

ctxTntID := tenant.FromContext(ctx)
pb := &ttnpb.EndDevice{}
if err := ttnredis.FindProto(r.Redis, r.euiKey(joinEUI, devEUI), func(uid string) (string, error) {
if err := ttnredis.FindProto(r.Redis.ReadOnlyClient(), r.euiKey(joinEUI, devEUI), func(uid string) (string, error) {
tntID, err := unique.ToTenantID(uid)
if err != nil {
return "", err
Expand Down Expand Up @@ -129,7 +129,7 @@ func (r *DeviceRegistry) RangeByAddr(ctx context.Context, addr types.DevAddr, pa
defer trace.StartRegion(ctx, "range end devices by dev_addr").End()

ctxTntID := tenant.FromContext(ctx)
return ttnredis.FindProtosWithKeys(r.Redis, r.addrKey(addr), r.uidKey).Range(func(k string) (proto.Message, func() (bool, error)) {
return ttnredis.FindProtosWithKeys(r.Redis.ReadOnlyClient(), r.addrKey(addr), r.uidKey).Range(func(k string) (proto.Message, func() (bool, error)) {
tntID, err := unique.ToTenantID(k)
if err != nil {
return nil, nil
Expand Down
28 changes: 27 additions & 1 deletion pkg/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func Key(ks ...string) string {
type Client struct {
*redis.Client
namespace string

readClient *Client
}

// Config represents Redis configuration.
Expand All @@ -87,6 +89,8 @@ type Config struct {
PoolSize int `name:"pool-size" description:"The maximum number of database connections"`
Failover FailoverConfig `name:"failover" description:"Redis failover configuration"`
namespace []string

ReadOnly ReadOnlyConfig `name:"readonly"`
}

func (c Config) WithNamespace(namespace ...string) *Config {
Expand Down Expand Up @@ -147,10 +151,32 @@ func newRedisClient(conf *Config) *redis.Client {

// New returns a new initialized Redis store.
func New(conf *Config) *Client {
return &Client{
cl := &Client{
namespace: Key(append(conf.RootNamespace, conf.namespace...)...),
Client: newRedisClient(conf),
}

readOnlyConfig := Config{
Address: conf.ReadOnly.Address,
Database: conf.ReadOnly.Database,
Password: conf.ReadOnly.Password,
PoolSize: conf.ReadOnly.PoolSize,
}
if !readOnlyConfig.IsZero() {
cl.readClient = &Client{
namespace: Key(append(conf.RootNamespace, conf.namespace...)...),
Client: newRedisClient(&readOnlyConfig),
}
}
return cl
}

// ReadOnlyClient returns a client with for read only operations.
func (cl *Client) ReadOnlyClient() *Client {
if cl.readClient != nil {
return cl.readClient
}
return cl
}

// Key constructs the full key for entity identified by ks by prepending the configured namespace and joining ks using the default separator.
Expand Down
11 changes: 11 additions & 0 deletions pkg/redis/redis.tti.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright © 2020 The Things Industries B.V.

package redis

// ReadOnlyConfig represents Redis read-only configuration.
type ReadOnlyConfig struct {
Address string `name:"address" description:"Address of the Redis server"`
Password string `name:"password" description:"Password of the Redis server"`
Database int `name:"database" description:"Redis database to use"`
PoolSize int `name:"pool-size" description:"The maximum number of database connections"`
}

0 comments on commit cd0d0b2

Please sign in to comment.