diff --git a/docs/configuration.md b/docs/configuration.md index 74fa5ecb..0e22b03c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -65,6 +65,7 @@ For **Duration** settings, the value should be be an integer followed by `ms`, ` | `logLevel` | `LOG_LEVEL` | String | `info` | Should be `debug`, `info`, `warn`, `error`, or `none`. To learn more, read [Logging](./logging.md). | | `bigSegmentsStaleAsDegraded` | `BIG_SEGMENTS_STALE_AS_DEGRADED` | Boolean | `false` | Indicates if environments should be considered degraded if Big Segments are not fully synchronized. | | `bigSegmentsStaleThreshold` | `BIG_SEGMENTS_STALE_THRESHOLD` | Duration | `5m` | Indicates how long until Big Segments should be considered stale. | +| n/a | `BATCH_FETCH_PERIOD` | Duration | `0` | The minimum latency between bulk fetching all data for a batch of new clients. Used only in proxy mode for streaming clients. Useful for reducing memory when under heavy load, as many clients can share a single data fetch. | _(1)_ The default values for `streamUri`, `baseUri`, and `clientSideBaseUri` are `https://stream.launchdarkly.com`, `https://sdk.launchdarkly.com`, and `https://clientsdk.launchdarkly.com`, respectively. You should never need to change these URIs unless you are either using a special instance of the LaunchDarkly service, in which case Support will tell you how to set them, or you are accessing LaunchDarkly using a reverse proxy or some other mechanism that rewrites URLs. diff --git a/internal/streams/stream_provider_server_side.go b/internal/streams/stream_provider_server_side.go index fa2696de..a6f58890 100644 --- a/internal/streams/stream_provider_server_side.go +++ b/internal/streams/stream_provider_server_side.go @@ -2,7 +2,9 @@ package streams import ( "net/http" + "os" "sync" + "time" "github.com/launchdarkly/ld-relay/v8/internal/sdkauth" @@ -31,7 +33,8 @@ type serverSideEnvStreamRepository struct { store EnvStoreQueries loggers ldlog.Loggers - flightGroup singleflight.Group + flightGroup singleflight.Group + previousFlight time.Time } func (s *serverSideStreamProvider) Handler(credential sdkauth.ScopedCredential) http.HandlerFunc { @@ -109,6 +112,20 @@ func (r *serverSideEnvStreamRepository) Replay(channel, id string) chan eventsou // getReplayEvent will return a ServerSidePutEvent with all the data needed for a Replay. func (r *serverSideEnvStreamRepository) getReplayEvent() (eventsource.Event, error) { data, err, _ := r.flightGroup.Do("getReplayEvent", func() (interface{}, error) { + // We do not want to call this flight group too often, as it can use a lot of RAM. + // This will ensure that we don't call it more than once every BATCH_FETCH_PERIOD. + delayS, has := os.LookupEnv("BATCH_FETCH_PERIOD") + if has { + if delay, err := time.ParseDuration(delayS); err == nil { + if time.Since(r.previousFlight) < delay { + time.Sleep(delay - time.Since(r.previousFlight)) + } + } else { + r.loggers.Warnf("Ignoring invalid BATCH_FETCH_PERIOD: %s\n", delayS) + } + r.previousFlight = time.Now() + } + flags, err := r.store.GetAll(ldstoreimpl.Features()) if err != nil { @@ -126,6 +143,7 @@ func (r *serverSideEnvStreamRepository) getReplayEvent() (eventsource.Event, err {Kind: ldstoreimpl.Segments(), Items: removeDeleted(segments)}, } + // This call uses a lot of system resources (RAM in particular). event := MakeServerSidePutEvent(allData) return event, nil })