Skip to content

Commit

Permalink
allegro-internal/flex-roadmap#819 renamed SnapshotUpdater metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
nastassia-dailidava committed Oct 19, 2024
1 parent 730d31d commit 84e0174
Showing 1 changed file with 22 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,24 @@ import io.envoyproxy.controlplane.cache.SnapshotCache
import io.envoyproxy.controlplane.cache.v3.Snapshot
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.Timer
import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.ADS
import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.XDS
import pl.allegro.tech.servicemesh.envoycontrol.groups.Group
import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.COMMUNICATION_MODE_ERROR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICE_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.SIMPLE_CACHE_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_ERROR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_GROUP_ERROR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_STATUS_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_UPDATE_DURATION_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.UPDATE_TRIGGER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.doOnNextScheduledOn
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
Expand Down Expand Up @@ -62,7 +65,9 @@ class SnapshotUpdater(
)
.measureBuffer("snapshot-updater-merged", meterRegistry, innerSources = 2)
.checkpoint("snapshot-updater-merged")
.name("snapshot-updater-merged").metrics()
.name(SNAPSHOT_METRIC)
.tag(CHECKPOINT_TAG, "merged")
.metrics()
// step 3: group updates don't provide a snapshot,
// so we piggyback the last updated snapshot state for use
.scan { previous: UpdateResult, newUpdate: UpdateResult ->
Expand Down Expand Up @@ -99,19 +104,15 @@ class SnapshotUpdater(
.publishOn(globalSnapshotScheduler)
.measureBuffer("snapshot-updater-groups-published", meterRegistry)
.checkpoint("snapshot-updater-groups-published")
.name("snapshot-updater-groups-published").metrics()
.name(SNAPSHOT_METRIC)
.tag(CHECKPOINT_TAG, "published")
.metrics()
.map { groups ->
UpdateResult(action = Action.SERVICES_GROUP_ADDED, groups = groups)
}
.name(SNAPSHOT_METRIC)
.tag(METRIC_EMITTER_TAG, "snapshot-updater")
.tag(SNAPSHOT_STATUS_TAG, "published")
.tag(UPDATE_TRIGGER_TAG, "groups")
.metrics()
.onErrorResume { e ->
meterRegistry.counter(
SNAPSHOT_ERROR_METRIC,
Tags.of(UPDATE_TRIGGER_TAG, "groups", METRIC_EMITTER_TAG, "snapshot-updater")
SNAPSHOT_ERROR_METRIC, Tags.of(UPDATE_TRIGGER_TAG, "groups")
)
.increment()
logger.error("Unable to process new group", e)
Expand All @@ -121,13 +122,17 @@ class SnapshotUpdater(

internal fun services(states: Flux<MultiClusterState>): Flux<UpdateResult> {
return states
.name("snapshot-updater-services-sampled").metrics()
.name(SERVICES_STATE_METRIC)
.tag(CHECKPOINT_TAG, "sampled")
.metrics()
.onBackpressureLatestMeasured("snapshot-updater-services-sampled", meterRegistry)
// prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure
.publishOn(globalSnapshotScheduler, 1)
.measureBuffer("snapshot-updater-services-published", meterRegistry)
.checkpoint("snapshot-updater-services-published")
.name("snapshot-updater-services-published").metrics()
.name(SERVICES_STATE_METRIC)
.tag(CHECKPOINT_TAG, "published")
.metrics()
.createClusterConfigurations()
.map { (states, clusters) ->
var lastXdsSnapshot: GlobalSnapshot? = null
Expand Down Expand Up @@ -155,8 +160,7 @@ class SnapshotUpdater(
.filter { it != emptyUpdateResult }
.onErrorResume { e ->
meterRegistry.counter(
SNAPSHOT_ERROR_METRIC,
Tags.of(METRIC_EMITTER_TAG, "snapshot-updater", UPDATE_TRIGGER_TAG, "services")
SNAPSHOT_ERROR_METRIC, Tags.of(UPDATE_TRIGGER_TAG, "services")
).increment()
logger.error("Unable to process service changes", e)
Mono.justOrEmpty(UpdateResult(action = Action.ERROR_PROCESSING_CHANGES))
Expand Down Expand Up @@ -190,10 +194,13 @@ class SnapshotUpdater(
}
}

private val updateSnapshotForGroupsTimer = meterRegistry.timer(SNAPSHOT_UPDATE_DURATION_METRIC)

private fun updateSnapshotForGroups(
groups: Collection<Group>,
result: UpdateResult
): Mono<UpdateResult> {
val sample = Timer.start()
versions.retainGroups(cache.groups())
val results = Flux.fromIterable(groups)
.doOnNextScheduledOn(groupSnapshotScheduler) { group ->
Expand All @@ -211,6 +218,7 @@ class SnapshotUpdater(
}
}
return results.then(Mono.fromCallable {
sample.stop(updateSnapshotForGroupsTimer)
result
})
}
Expand Down

0 comments on commit 84e0174

Please sign in to comment.