From f9f76415dcb2c7439fac9b67cd4b86abee3d51e6 Mon Sep 17 00:00:00 2001 From: Aggelos Kolaitis Date: Fri, 3 Jul 2020 19:52:42 +0300 Subject: [PATCH 1/5] all: Support Redis read replicas --- CHANGELOG.md | 4 +++- .../io/packages/redis/registry.go | 2 +- .../io/pubsub/redis/registry.go | 4 ++-- pkg/applicationserver/io/web/redis/registry.go | 4 ++-- pkg/applicationserver/redis/registry.go | 8 ++++---- pkg/cluster/claims_redis.tti.go | 2 +- pkg/gatewayserver/redis/registry.go | 2 +- pkg/joinserver/redis/registry.go | 6 +++--- pkg/networkserver/redis/registry.go | 4 ++-- pkg/redis/redis.go | 18 ++++++++++++++++++ 10 files changed, 37 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 572e5395b7..c8ca7039e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Support for Redis read replicas, reducing the load on the read-write Redis master. + ### Changed ### Deprecated @@ -146,7 +148,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- Missing target ns and as in parameters in device claiming sdk claim request +- Missing target ns and as in parameters in device claiming sdk claim request - Handling of device unsetting the ADR bit in uplink, after ADR has been started. - Invalid `oauth-server-address` in CLI config generated by `use` command when config file is already present. - Network Server now properly handles FPort 0 data uplinks carrying FOpts. diff --git a/pkg/applicationserver/io/packages/redis/registry.go b/pkg/applicationserver/io/packages/redis/registry.go index 3147ba8f40..49731a80c2 100644 --- a/pkg/applicationserver/io/packages/redis/registry.go +++ b/pkg/applicationserver/io/packages/redis/registry.go @@ -81,7 +81,7 @@ func (r *ApplicationPackagesRegistry) makeAssociationKeyFunc(devUID string) func func (r ApplicationPackagesRegistry) Get(ctx context.Context, ids ttnpb.ApplicationPackageAssociationIdentifiers, paths []string) (*ttnpb.ApplicationPackageAssociation, error) { pb := &ttnpb.ApplicationPackageAssociation{} defer trace.StartRegion(ctx, "get application package association by id").End() - if err := ttnredis.GetProto(r.Redis, r.associationKey(unique.ID(ctx, ids.EndDeviceIdentifiers), r.fPortStr(ids.FPort))).ScanProto(pb); err != nil { + if err := ttnredis.GetProto(r.Redis.ReadOnlyClient(), r.associationKey(unique.ID(ctx, ids.EndDeviceIdentifiers), r.fPortStr(ids.FPort))).ScanProto(pb); err != nil { return nil, err } return applyAssociationFieldMask(nil, pb, appendImplicitAssociationGetPaths(paths...)...) diff --git a/pkg/applicationserver/io/pubsub/redis/registry.go b/pkg/applicationserver/io/pubsub/redis/registry.go index ea0c3709b1..480759de67 100644 --- a/pkg/applicationserver/io/pubsub/redis/registry.go +++ b/pkg/applicationserver/io/pubsub/redis/registry.go @@ -76,7 +76,7 @@ func (r *PubSubRegistry) makeUIDKeyFunc(appUID string) func(id string) string { // Get implements pubsub.Registry. func (r PubSubRegistry) Get(ctx context.Context, ids ttnpb.ApplicationPubSubIdentifiers, paths []string) (*ttnpb.ApplicationPubSub, error) { pb := &ttnpb.ApplicationPubSub{} - if err := ttnredis.GetProto(r.Redis, r.uidKey(unique.ID(ctx, ids.ApplicationIdentifiers), ids.PubSubID)).ScanProto(pb); err != nil { + if err := ttnredis.GetProto(r.Redis.ReadOnlyClient(), r.uidKey(unique.ID(ctx, ids.ApplicationIdentifiers), ids.PubSubID)).ScanProto(pb); err != nil { return nil, err } return applyPubSubFieldMask(nil, pb, appendImplicitPubSubGetPaths(paths...)...) @@ -101,7 +101,7 @@ func (r PubSubRegistry) Range(ctx context.Context, paths []string, f func(contex return errApplicationUID.WithCause(err).WithAttributes("application_uid", appUID, "pub_sub_id", psID) } pb := &ttnpb.ApplicationPubSub{} - if err := ttnredis.GetProto(r.Redis, r.uidKey(appUID, psID)).ScanProto(pb); err != nil { + if err := ttnredis.GetProto(r.Redis.ReadOnlyClient(), r.uidKey(appUID, psID)).ScanProto(pb); err != nil { return err } if err != nil { diff --git a/pkg/applicationserver/io/web/redis/registry.go b/pkg/applicationserver/io/web/redis/registry.go index 2505a5b6de..3e885a8fd6 100644 --- a/pkg/applicationserver/io/web/redis/registry.go +++ b/pkg/applicationserver/io/web/redis/registry.go @@ -70,7 +70,7 @@ func (r *WebhookRegistry) makeIDKeyFunc(appUID string) func(id string) string { // Get implements WebhookRegistry. func (r WebhookRegistry) Get(ctx context.Context, ids ttnpb.ApplicationWebhookIdentifiers, paths []string) (*ttnpb.ApplicationWebhook, error) { pb := &ttnpb.ApplicationWebhook{} - if err := ttnredis.GetProto(r.Redis, r.idKey(unique.ID(ctx, ids.ApplicationIdentifiers), ids.WebhookID)).ScanProto(pb); err != nil { + if err := ttnredis.GetProto(r.Redis.ReadOnlyClient(), r.idKey(unique.ID(ctx, ids.ApplicationIdentifiers), ids.WebhookID)).ScanProto(pb); err != nil { return nil, err } return applyWebhookFieldMask(nil, pb, appendImplicitWebhookGetPaths(paths...)...) @@ -80,7 +80,7 @@ func (r WebhookRegistry) Get(ctx context.Context, ids ttnpb.ApplicationWebhookId func (r WebhookRegistry) List(ctx context.Context, ids ttnpb.ApplicationIdentifiers, paths []string) ([]*ttnpb.ApplicationWebhook, error) { var pbs []*ttnpb.ApplicationWebhook appUID := unique.ID(ctx, ids) - err := ttnredis.FindProtos(r.Redis, r.appKey(appUID), r.makeIDKeyFunc(appUID)).Range(func() (proto.Message, func() (bool, error)) { + err := ttnredis.FindProtos(r.Redis.ReadOnlyClient(), r.appKey(appUID), r.makeIDKeyFunc(appUID)).Range(func() (proto.Message, func() (bool, error)) { pb := &ttnpb.ApplicationWebhook{} return pb, func() (bool, error) { pb, err := applyWebhookFieldMask(nil, pb, appendImplicitWebhookGetPaths(paths...)...) diff --git a/pkg/applicationserver/redis/registry.go b/pkg/applicationserver/redis/registry.go index b99e7d6aae..7225d41b86 100644 --- a/pkg/applicationserver/redis/registry.go +++ b/pkg/applicationserver/redis/registry.go @@ -56,7 +56,7 @@ func (r *DeviceRegistry) Get(ctx context.Context, ids ttnpb.EndDeviceIdentifiers defer trace.StartRegion(ctx, "get end device").End() pb := &ttnpb.EndDevice{} - if err := ttnredis.GetProto(r.Redis, r.uidKey(unique.ID(ctx, ids))).ScanProto(pb); err != nil { + if err := ttnredis.GetProto(r.Redis.ReadOnlyClient(), r.uidKey(unique.ID(ctx, ids))).ScanProto(pb); err != nil { return nil, err } return ttnpb.FilterGetEndDevice(pb, paths...) @@ -251,7 +251,7 @@ func (r *LinkRegistry) Get(ctx context.Context, ids ttnpb.ApplicationIdentifiers defer trace.StartRegion(ctx, "get link").End() pb := &ttnpb.ApplicationLink{} - if err := ttnredis.GetProto(r.Redis, r.appKey(unique.ID(ctx, ids))).ScanProto(pb); err != nil { + if err := ttnredis.GetProto(r.Redis.ReadOnlyClient(), r.appKey(unique.ID(ctx, ids))).ScanProto(pb); err != nil { return nil, err } return applyLinkFieldMask(nil, pb, paths...) @@ -263,7 +263,7 @@ var errApplicationUID = errors.DefineCorruption("application_uid", "invalid appl func (r *LinkRegistry) Range(ctx context.Context, paths []string, f func(context.Context, ttnpb.ApplicationIdentifiers, *ttnpb.ApplicationLink) bool) error { defer trace.StartRegion(ctx, "range links").End() - uids, err := r.Redis.SMembers(r.allKey(ctx)).Result() + uids, err := r.Redis.ReadOnlyClient().SMembers(r.allKey(ctx)).Result() if err != nil { return ttnredis.ConvertError(err) } @@ -277,7 +277,7 @@ func (r *LinkRegistry) Range(ctx context.Context, paths []string, f func(context return errApplicationUID.WithCause(err).WithAttributes("application_uid", uid) } pb := &ttnpb.ApplicationLink{} - if err := ttnredis.GetProto(r.Redis, r.appKey(uid)).ScanProto(pb); err != nil { + if err := ttnredis.GetProto(r.Redis.ReadOnlyClient(), r.appKey(uid)).ScanProto(pb); err != nil { return err } pb, err = applyLinkFieldMask(nil, pb, paths...) diff --git a/pkg/cluster/claims_redis.tti.go b/pkg/cluster/claims_redis.tti.go index fd37f6d8db..2d8416b6df 100644 --- a/pkg/cluster/claims_redis.tti.go +++ b/pkg/cluster/claims_redis.tti.go @@ -119,7 +119,7 @@ func (r *RedisClaimRegistry) Unclaim(ctx context.Context, ids ttnpb.Identifiers) // GetPeerID looks up which from the given candidates has a claim on the given identifiers. func (r *RedisClaimRegistry) GetPeerID(ctx context.Context, ids ttnpb.Identifiers, candidateIDs ...string) (string, error) { results := make([]*redis.BoolCmd, len(candidateIDs)) - _, err := r.Redis.Pipelined(func(tx redis.Pipeliner) error { + _, err := r.Redis.ReadOnlyClient().Pipelined(func(tx redis.Pipeliner) error { for i, candidateID := range candidateIDs { if ids, ok := ids.Identifiers().(*ttnpb.GatewayIdentifiers); ok && ids.EUI != nil { results[i] = tx.SIsMember( diff --git a/pkg/gatewayserver/redis/registry.go b/pkg/gatewayserver/redis/registry.go index e79744e566..158cc7e948 100644 --- a/pkg/gatewayserver/redis/registry.go +++ b/pkg/gatewayserver/redis/registry.go @@ -115,7 +115,7 @@ func (r *GatewayConnectionStatsRegistry) Get(ctx context.Context, ids ttnpb.Gate result := &ttnpb.GatewayConnectionStats{} stats := &ttnpb.GatewayConnectionStats{} - retrieved, err := r.Redis.MGet(r.key(upKey, uid), r.key(downKey, uid), r.key(statusKey, uid)).Result() + retrieved, err := r.Redis.ReadOnlyClient().MGet(r.key(upKey, uid), r.key(downKey, uid), r.key(statusKey, uid)).Result() if err != nil { return nil, ttnredis.ConvertError(err) } diff --git a/pkg/joinserver/redis/registry.go b/pkg/joinserver/redis/registry.go index 12fb58165d..b71f07b38a 100644 --- a/pkg/joinserver/redis/registry.go +++ b/pkg/joinserver/redis/registry.go @@ -82,7 +82,7 @@ func (r *DeviceRegistry) GetByID(ctx context.Context, appID ttnpb.ApplicationIde defer trace.StartRegion(ctx, "get end device by id").End() pb := &ttnpb.EndDevice{} - if err := ttnredis.GetProto(r.Redis, r.uidKey(unique.ID(ctx, ids))).ScanProto(pb); err != nil { + if err := ttnredis.GetProto(r.Redis.ReadOnlyClient(), r.uidKey(unique.ID(ctx, ids))).ScanProto(pb); err != nil { return nil, err } return ttnpb.FilterGetEndDevice(pb, paths...) @@ -97,7 +97,7 @@ func (r *DeviceRegistry) GetByEUI(ctx context.Context, joinEUI, devEUI types.EUI defer trace.StartRegion(ctx, "get end device by eui").End() pb := &ttnpb.EndDevice{} - if err := ttnredis.FindProto(r.Redis, r.euiKey(joinEUI, devEUI), func(uid string) (string, error) { + if err := ttnredis.FindProto(r.Redis.ReadOnlyClient(), r.euiKey(joinEUI, devEUI), func(uid string) (string, error) { tntID, err := unique.ToTenantID(uid) if err != nil { return "", err @@ -394,7 +394,7 @@ func (r *KeyRegistry) GetByID(ctx context.Context, joinEUI, devEUI types.EUI64, defer trace.StartRegion(ctx, "get session keys").End() pb := &ttnpb.SessionKeys{} - if err := ttnredis.GetProto(r.Redis, r.idKey(joinEUI, devEUI, id)).ScanProto(pb); err != nil { + if err := ttnredis.GetProto(r.Redis.ReadOnlyClient(), r.idKey(joinEUI, devEUI, id)).ScanProto(pb); err != nil { return nil, err } return ttnpb.FilterGetSessionKeys(pb, paths...) diff --git a/pkg/networkserver/redis/registry.go b/pkg/networkserver/redis/registry.go index 1dc0469dea..8099a1f0e6 100644 --- a/pkg/networkserver/redis/registry.go +++ b/pkg/networkserver/redis/registry.go @@ -101,7 +101,7 @@ func (r *DeviceRegistry) GetByEUI(ctx context.Context, joinEUI, devEUI types.EUI ctxTntID := tenant.FromContext(ctx) pb := &ttnpb.EndDevice{} - if err := ttnredis.FindProto(r.Redis, r.euiKey(joinEUI, devEUI), func(uid string) (string, error) { + if err := ttnredis.FindProto(r.Redis.ReadOnlyClient(), r.euiKey(joinEUI, devEUI), func(uid string) (string, error) { tntID, err := unique.ToTenantID(uid) if err != nil { return "", err @@ -129,7 +129,7 @@ func (r *DeviceRegistry) RangeByAddr(ctx context.Context, addr types.DevAddr, pa defer trace.StartRegion(ctx, "range end devices by dev_addr").End() ctxTntID := tenant.FromContext(ctx) - return ttnredis.FindProtosWithKeys(r.Redis, r.addrKey(addr), r.uidKey).Range(func(k string) (proto.Message, func() (bool, error)) { + return ttnredis.FindProtosWithKeys(r.Redis.ReadOnlyClient(), r.addrKey(addr), r.uidKey).Range(func(k string) (proto.Message, func() (bool, error)) { tntID, err := unique.ToTenantID(k) if err != nil { return nil, nil diff --git a/pkg/redis/redis.go b/pkg/redis/redis.go index ba47a066e6..5c872595bd 100644 --- a/pkg/redis/redis.go +++ b/pkg/redis/redis.go @@ -24,6 +24,7 @@ import ( "net" "strconv" "strings" + "sync" "time" "github.com/go-redis/redis/v7" @@ -76,6 +77,10 @@ func Key(ks ...string) string { type Client struct { *redis.Client namespace string + + config *Config + readClient *Client + readClientOnce sync.Once } // Config represents Redis configuration. @@ -150,9 +155,22 @@ func New(conf *Config) *Client { return &Client{ namespace: Key(append(conf.RootNamespace, conf.namespace...)...), Client: newRedisClient(conf), + config: conf, } } +// ReadOnlyClient returns a client with for read only operations. +func (cl *Client) ReadOnlyClient() *Client { + cl.readClientOnce.Do(func() { + cl.readClient = New(cl.config) + if err := cl.readClient.ReadOnly(); err != nil { + cl.readClient.Close() + cl.readClient = cl + } + }) + return cl.readClient +} + // Key constructs the full key for entity identified by ks by prepending the configured namespace and joining ks using the default separator. func (cl *Client) Key(ks ...string) string { return Key(append([]string{cl.namespace}, ks...)...) From ca942bd77667f0f0f24f4bad42d14458c7a1d2c4 Mon Sep 17 00:00:00 2001 From: Aggelos Kolaitis Date: Fri, 3 Jul 2020 20:15:26 +0300 Subject: [PATCH 2/5] all: Use separate config for Redis read replicas --- pkg/redis/redis.go | 36 ++++++++++++++++++++++-------------- pkg/redis/redis.tti.go | 11 +++++++++++ 2 files changed, 33 insertions(+), 14 deletions(-) create mode 100644 pkg/redis/redis.tti.go diff --git a/pkg/redis/redis.go b/pkg/redis/redis.go index 5c872595bd..758c2a012a 100644 --- a/pkg/redis/redis.go +++ b/pkg/redis/redis.go @@ -24,7 +24,6 @@ import ( "net" "strconv" "strings" - "sync" "time" "github.com/go-redis/redis/v7" @@ -78,9 +77,7 @@ type Client struct { *redis.Client namespace string - config *Config - readClient *Client - readClientOnce sync.Once + readClient *Client } // Config represents Redis configuration. @@ -92,6 +89,8 @@ type Config struct { PoolSize int `name:"pool-size" description:"The maximum number of database connections"` Failover FailoverConfig `name:"failover" description:"Redis failover configuration"` namespace []string + + ReadOnly ReadOnlyConfig } func (c Config) WithNamespace(namespace ...string) *Config { @@ -152,23 +151,32 @@ func newRedisClient(conf *Config) *redis.Client { // New returns a new initialized Redis store. func New(conf *Config) *Client { - return &Client{ + cl := &Client{ namespace: Key(append(conf.RootNamespace, conf.namespace...)...), Client: newRedisClient(conf), - config: conf, } + + readOnlyConfig := Config{ + Address: conf.ReadOnly.Address, + Database: conf.ReadOnly.Database, + Password: conf.ReadOnly.Password, + PoolSize: conf.ReadOnly.PoolSize, + } + if !readOnlyConfig.IsZero() { + cl.readClient = &Client{ + namespace: Key(append(conf.RootNamespace, conf.namespace...)...), + Client: newRedisClient(&readOnlyConfig), + } + } + return cl } // ReadOnlyClient returns a client with for read only operations. func (cl *Client) ReadOnlyClient() *Client { - cl.readClientOnce.Do(func() { - cl.readClient = New(cl.config) - if err := cl.readClient.ReadOnly(); err != nil { - cl.readClient.Close() - cl.readClient = cl - } - }) - return cl.readClient + if cl.readClient != nil { + return cl.readClient + } + return cl } // Key constructs the full key for entity identified by ks by prepending the configured namespace and joining ks using the default separator. diff --git a/pkg/redis/redis.tti.go b/pkg/redis/redis.tti.go new file mode 100644 index 0000000000..9ab276ba39 --- /dev/null +++ b/pkg/redis/redis.tti.go @@ -0,0 +1,11 @@ +// Copyright © 2020 The Things Industries B.V. + +package redis + +// ReadOnlyConfig represents Redis read-only configuration. +type ReadOnlyConfig struct { + Address string `name:"address" description:"Address of the Redis server"` + Password string `name:"password" description:"Password of the Redis server"` + Database int `name:"database" description:"Redis database to use"` + PoolSize int `name:"pool-size" description:"The maximum number of database connections"` +} From 7c58e22f1ce7c377fda82bdf9b771941f4d30dc0 Mon Sep 17 00:00:00 2001 From: Aggelos Kolaitis Date: Sat, 4 Jul 2020 12:40:44 +0300 Subject: [PATCH 3/5] doc: Add Redis read replicas configuration --- .../reference/configuration/the-things-stack.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/doc/content/reference/configuration/the-things-stack.md b/doc/content/reference/configuration/the-things-stack.md index 621da74aef..8b14baf30f 100644 --- a/doc/content/reference/configuration/the-things-stack.md +++ b/doc/content/reference/configuration/the-things-stack.md @@ -146,6 +146,13 @@ Or you can enable failover using [Redis Sentinel](https://redis.io/topics/sentin - `redis.failover.addresses`: List of addresses of the Redis Sentinel instances (required) - `redis.failover.master-name`: Redis Sentinel master name (required) +Further, you can reduce the load on the Redis master by specifying read-only configuration: + +- `redis.readonly.address`: Address of the Redis server +- `redis.readonly.password`: Password of the Redis server +- `redis.readonly.database`: Redis database to use +- `redis.readonly.pool-size`: The maximum size of the connection pool + ## Blob Options The `blob` options configure how {{% tts %}} reads or writes files such as pictures, the frequency plans repository or files required for Backend Interfaces interoperability. The `provider` field selects the provider that is used, and which other options are read. @@ -181,6 +188,13 @@ When using the `redis` backend, the global [Redis configuration]({{< ref "#redis - `events.redis.namespace`: Namespace for Redis keys - `events.redis.pool-size`: The maximum size of the connection pool +Similar to the global Redis configuration, you can reduce the load on the Redis master by specifying read-only configuration: + +- `events.redis.readonly.address`: Address of the Redis server +- `events.redis.readonly.password`: Password of the Redis server +- `events.redis.readonly.database`: Redis database to use +- `events.redis.readonly.pool-size`: The maximum size of the connection pool + With the `cloud` backend, the configured publish and subscribe URLs are passed to [the Go CDK](https://gocloud.dev/howto/pubsub/). - `events.cloud.publish-url`: URL for the topic to send events From d215413f05503ab2b2dd47a79dc0c889dc702c4e Mon Sep 17 00:00:00 2001 From: Aggelos Kolaitis Date: Tue, 7 Jul 2020 10:15:05 +0300 Subject: [PATCH 4/5] util: Use read-only Redis client for events PubSub --- pkg/events/redis/redis.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/events/redis/redis.go b/pkg/events/redis/redis.go index 5033b8908f..346de72ee4 100644 --- a/pkg/events/redis/redis.go +++ b/pkg/events/redis/redis.go @@ -32,6 +32,8 @@ func WrapPubSub(wrapped events.PubSub, conf ttnredis.Config) (ps *PubSub) { client: ttnRedisClient.Client, eventChannel: ttnRedisClient.Key("events"), closeWait: make(chan struct{}), + + readClient: ttnRedisClient.ReadOnlyClient().Client, } return } @@ -50,12 +52,14 @@ type PubSub struct { subOnce sync.Once sub *redis.PubSub closeWait chan struct{} + + readClient *redis.Client } // Subscribe implements the events.Subscriber interface. func (ps *PubSub) Subscribe(name string, hdl events.Handler) error { ps.subOnce.Do(func() { - ps.sub = ps.client.Subscribe(ps.eventChannel) + ps.sub = ps.readClient.Subscribe(ps.eventChannel) go func() { defer close(ps.closeWait) for { From 80314b571fda114ff68bf4b21a505799e13fb3a5 Mon Sep 17 00:00:00 2001 From: Aggelos Kolaitis Date: Tue, 7 Jul 2020 10:19:28 +0300 Subject: [PATCH 5/5] util: Explicitly set readonly configuration name --- pkg/redis/redis.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/redis/redis.go b/pkg/redis/redis.go index 758c2a012a..ea834080ef 100644 --- a/pkg/redis/redis.go +++ b/pkg/redis/redis.go @@ -90,7 +90,7 @@ type Config struct { Failover FailoverConfig `name:"failover" description:"Redis failover configuration"` namespace []string - ReadOnly ReadOnlyConfig + ReadOnly ReadOnlyConfig `name:"readonly"` } func (c Config) WithNamespace(namespace ...string) *Config {