Skip to content

Commit

Permalink
allegro-internal/flex-roadmap#819 rolled back SnapshotUpdater metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
nastassia-dailidava committed Oct 19, 2024
1 parent c1d47b1 commit 730d31d
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ class SnapshotUpdater(
// step 2: only watches groups. if groups change we use the last services state and update those groups
groups().subscribeOn(globalSnapshotScheduler)
)
.measureBuffer("snapshot-updater", meterRegistry, innerSources = 2)
.measureBuffer("snapshot-updater-merged", meterRegistry, innerSources = 2)
.checkpoint("snapshot-updater-merged")
.name("snapshot-updater-merged").metrics()
// step 3: group updates don't provide a snapshot,
// so we piggyback the last updated snapshot state for use
.scan { previous: UpdateResult, newUpdate: UpdateResult ->
Expand Down Expand Up @@ -96,8 +97,9 @@ class SnapshotUpdater(
// see GroupChangeWatcher
return onGroupAdded
.publishOn(globalSnapshotScheduler)
.measureBuffer("snapshot-updater", meterRegistry)
.measureBuffer("snapshot-updater-groups-published", meterRegistry)
.checkpoint("snapshot-updater-groups-published")
.name("snapshot-updater-groups-published").metrics()
.map { groups ->
UpdateResult(action = Action.SERVICES_GROUP_ADDED, groups = groups)
}
Expand All @@ -119,11 +121,13 @@ class SnapshotUpdater(

internal fun services(states: Flux<MultiClusterState>): Flux<UpdateResult> {
return states
.onBackpressureLatestMeasured("snapshot-updater", meterRegistry)
.name("snapshot-updater-services-sampled").metrics()
.onBackpressureLatestMeasured("snapshot-updater-services-sampled", meterRegistry)
// prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure
.publishOn(globalSnapshotScheduler, 1)
.measureBuffer("snapshot-updater", meterRegistry)
.measureBuffer("snapshot-updater-services-published", meterRegistry)
.checkpoint("snapshot-updater-services-published")
.name("snapshot-updater-services-published").metrics()
.createClusterConfigurations()
.map { (states, clusters) ->
var lastXdsSnapshot: GlobalSnapshot? = null
Expand Down

0 comments on commit 730d31d

Please sign in to comment.