Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
nastassia-dailidava committed Oct 4, 2024
1 parent 5edcb15 commit be4e0cd
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 68 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ allprojects {
bytebuddy : '1.15.1',
re2j : '1.3',
xxhash : '0.10.1',
dropwizard : '4.2.26'
dropwizard : '4.2.26',
reactor_core_micrometer: '1.0.6'
]

dependencyManagement {
Expand Down
2 changes: 2 additions & 0 deletions envoy-control-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ dependencies {
implementation group: 'org.jetbrains.kotlin', name: 'kotlin-reflect'
api group: 'io.dropwizard.metrics', name: 'metrics-core', version: versions.dropwizard
api group: 'io.micrometer', name: 'micrometer-core'
implementation group: 'io.projectreactor', name: 'reactor-core-micrometer', version: versions.reactor_core_micrometer

implementation group: 'com.google.re2j', name: 're2j', version: versions.re2j

api group: 'io.envoyproxy.controlplane', name: 'server', version: versions.java_controlplane
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.micrometer.core.instrument.MeterRegistry
import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlMetrics
import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
import reactor.core.observability.micrometer.Micrometer
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
import java.util.function.Consumer
Expand All @@ -34,9 +35,13 @@ internal class GroupChangeWatcher(

fun onGroupAdded(): Flux<List<Group>> {
return groupsChanged
.measureBuffer("group-change-watcher-emitted", meterRegistry)
.measureBuffer("group-change-watcher", meterRegistry)
.checkpoint("group-change-watcher-emitted")
.name("group-change-watcher-emitted").metrics()
.name("group_change_watcher")
.tap(Micrometer.metrics(meterRegistry))
.doOnSubscribe {
logger.info("Watching group changes")
}
.doOnCancel {
logger.warn("Cancelling watching group changes")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal class CachedProtoResourcesSerializer(
}

private val cache: Cache<Message, Any> = createCache("protobuf-cache")
private val timer = createTimer(reportMetrics, meterRegistry, "protobuf-cache.serialize.time")
private val timer = createTimer(reportMetrics, meterRegistry, "protobuf_cache.serialize.time")

private fun <K, V> createCache(cacheName: String): Cache<K, V> {
return if (reportMetrics) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package pl.allegro.tech.servicemesh.envoycontrol.server.callbacks

import com.google.common.net.InetAddresses.increment
import io.envoyproxy.controlplane.cache.Resources
import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest as V3DiscoveryRequest
import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest as V3DeltaDiscoveryRequest
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import pl.allegro.tech.servicemesh.envoycontrol.utils.connectionTypeTag
import pl.allegro.tech.servicemesh.envoycontrol.utils.connectionsMetricName
import pl.allegro.tech.servicemesh.envoycontrol.utils.discoveryReqTypeTag
import pl.allegro.tech.servicemesh.envoycontrol.utils.requestsMetricName
import pl.allegro.tech.servicemesh.envoycontrol.utils.streamTypeTag
import java.util.concurrent.atomic.AtomicInteger

class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) : DiscoveryServerCallbacks {
Expand Down Expand Up @@ -38,8 +42,8 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry)

connectionsByType.forEach { (type, typeConnections) ->
meterRegistry.gauge(
"connections",
Tags.of("connection-type", "grpc", "stream-type", type.name.lowercase()),
connectionsMetricName,
Tags.of(connectionTypeTag, "grpc", streamTypeTag, type.name.lowercase()),
typeConnections
)
}
Expand All @@ -57,11 +61,11 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry)

override fun onV3StreamRequest(streamId: Long, request: V3DiscoveryRequest) {
meterRegistry.counter(
"requests.total",
requestsMetricName,
Tags.of(
"connection-type", "grpc",
"stream-type", StreamType.fromTypeUrl(request.typeUrl).name.lowercase(),
"discovery-request-type", "total"
connectionTypeTag, "grpc",
streamTypeTag, StreamType.fromTypeUrl(request.typeUrl).name.lowercase(),
discoveryReqTypeTag, "total"
)
)
.increment()
Expand All @@ -72,10 +76,11 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry)
request: V3DeltaDiscoveryRequest
) {
meterRegistry.counter(
"requests.total",
requestsMetricName,
Tags.of(
"connection-type", "grpc",
"stream-type", StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), "discovery-request-type", "delta"
connectionTypeTag, "grpc",
streamTypeTag, StreamType.fromTypeUrl(request.typeUrl).name.lowercase(),
discoveryReqTypeTag, "delta"
)
)
.increment()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.doOnNextScheduledOn
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
import pl.allegro.tech.servicemesh.envoycontrol.utils.noopTimer
import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured
import pl.allegro.tech.servicemesh.envoycontrol.utils.reactorMetricName
import pl.allegro.tech.servicemesh.envoycontrol.utils.errorsMetricName
import pl.allegro.tech.servicemesh.envoycontrol.utils.operationTag
import pl.allegro.tech.servicemesh.envoycontrol.utils.metricEmitterTag
import pl.allegro.tech.servicemesh.envoycontrol.utils.serviceTag
import pl.allegro.tech.servicemesh.envoycontrol.utils.snapshotStatusTag
import pl.allegro.tech.servicemesh.envoycontrol.utils.statusTag
import pl.allegro.tech.servicemesh.envoycontrol.utils.updateTriggerTag
import reactor.core.observability.micrometer.Micrometer
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler
Expand Down Expand Up @@ -51,12 +60,13 @@ class SnapshotUpdater(
// step 2: only watches groups. if groups change we use the last services state and update those groups
groups().subscribeOn(globalSnapshotScheduler)
)
.measureBuffer("snapshot.updater.count.total", meterRegistry, innerSources = 2)
.measureBuffer("snapshot-updater", meterRegistry, innerSources = 2)
.checkpoint("snapshot-updater-merged")
.name("snapshot.updater.count.total")
.tag("status", "merged")
.tag("type", "global")
.metrics()
.name(reactorMetricName)
.tag(metricEmitterTag, "snapshot-updater")
.tag(snapshotStatusTag, "merged")
.tag(updateTriggerTag, "global")
.tap(Micrometer.metrics(meterRegistry))
// step 3: group updates don't provide a snapshot,
// so we piggyback the last updated snapshot state for use
.scan { previous: UpdateResult, newUpdate: UpdateResult ->
Expand Down Expand Up @@ -91,18 +101,20 @@ class SnapshotUpdater(
// see GroupChangeWatcher
return onGroupAdded
.publishOn(globalSnapshotScheduler)
.measureBuffer("snapshot.updater.count.total", meterRegistry)
.measureBuffer("snapshot-updater", meterRegistry)
.checkpoint("snapshot-updater-groups-published")
.name("snapshot.updater.count.total")
.tag("type", "groups")
.tag("status", "published").metrics()
.map { groups ->
UpdateResult(action = Action.SERVICES_GROUP_ADDED, groups = groups)
}
.name(reactorMetricName)
.tag(metricEmitterTag, "snapshot-updater")
.tag(snapshotStatusTag, "published")
.tag(updateTriggerTag, "groups")
.tap(Micrometer.metrics(meterRegistry))
.onErrorResume { e ->
meterRegistry.counter(
"snapshot.updater.errors.total",
Tags.of("type", "groups")
errorsMetricName,
Tags.of(updateTriggerTag, "groups", metricEmitterTag, "snapshot-updater")
)
.increment()
logger.error("Unable to process new group", e)
Expand All @@ -112,19 +124,19 @@ class SnapshotUpdater(

internal fun services(states: Flux<MultiClusterState>): Flux<UpdateResult> {
return states
.name("snapshot.updater.count.total")
.tag("type", "services")
.tag("status", "sampled")
.metrics()
.onBackpressureLatestMeasured("snapshot.updater.count.total", meterRegistry)
.name(reactorMetricName)
.tag(updateTriggerTag, "services")
.tag(statusTag, "sampled")
.tap(Micrometer.metrics(meterRegistry))
.onBackpressureLatestMeasured("snapshot-updater", meterRegistry)
// prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure
.publishOn(globalSnapshotScheduler, 1)
.measureBuffer("snapshot.updater.count.total", meterRegistry)
.measureBuffer("snapshot-updater", meterRegistry)
.checkpoint("snapshot-updater-services-published")
.name("snapshot.updater.count.total")
.tag("type", "services")
.tag("status", "published")
.metrics()
.name(reactorMetricName)
.tag(updateTriggerTag, "services")
.tag(statusTag, "published")
.tap(Micrometer.metrics(meterRegistry))
.createClusterConfigurations()
.map { (states, clusters) ->
var lastXdsSnapshot: GlobalSnapshot? = null
Expand Down Expand Up @@ -152,8 +164,8 @@ class SnapshotUpdater(
.filter { it != emptyUpdateResult }
.onErrorResume { e ->
meterRegistry.counter(
"snapshot.updater.errors.total",
Tags.of("type", "services")
errorsMetricName,
Tags.of(metricEmitterTag, "snapshot-updater", updateTriggerTag, "services")
).increment()
logger.error("Unable to process service changes", e)
Mono.justOrEmpty(UpdateResult(action = Action.ERROR_PROCESSING_CHANGES))
Expand All @@ -162,7 +174,7 @@ class SnapshotUpdater(

private fun snapshotTimer(serviceName: String) = if (properties.metrics.cacheSetSnapshot) {
meterRegistry.timer(
"simple.cache.duration.seconds", Tags.of("service", serviceName, "operation", "set-snapshot")
"simple.cache.duration.seconds", Tags.of(serviceTag, serviceName, operationTag, "set-snapshot")
)
} else {
noopTimer
Expand All @@ -176,14 +188,13 @@ class SnapshotUpdater(
}
} catch (e: Throwable) {
meterRegistry.counter(
"snapshot.updater.errors.total", Tags.of("service", group.serviceName)
errorsMetricName, Tags.of(serviceTag, group.serviceName, operationTag, "create-snapshot")
).increment()
logger.error("Unable to create snapshot for group ${group.serviceName}", e)
}
}

private val updateSnapshotForGroupsTimer =
meterRegistry.timer("snapshot.updater.duration.seconds", Tags.of("type", "groups"))
private val updateSnapshotForGroupsTimer = meterRegistry.timer("snapshot.update.duration.seconds")

private fun updateSnapshotForGroups(
groups: Collection<Group>,
Expand All @@ -198,8 +209,7 @@ class SnapshotUpdater(
} else if (result.xdsSnapshot != null && group.communicationMode == XDS) {
updateSnapshotForGroup(group, result.xdsSnapshot)
} else {
meterRegistry.counter("snapshot.updater.errors.total", Tags.of("type", "communication-mode"))
.increment()
meterRegistry.counter(errorsMetricName, Tags.of("type", "communication-mode")).increment()
logger.error(
"Requested snapshot for ${group.communicationMode.name} mode, but it is not here. " +
"Handling Envoy with not supported communication mode should have been rejected before." +
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package pl.allegro.tech.servicemesh.envoycontrol.synchronization

import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Companion.toMultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges
import pl.allegro.tech.servicemesh.envoycontrol.utils.logSuppressedError
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured
import reactor.core.observability.micrometer.Micrometer
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers

Expand All @@ -15,8 +17,10 @@ class GlobalStateChanges(
private val meterRegistry: MeterRegistry,
private val properties: SyncProperties
) {
private val scheduler = Schedulers.newBoundedElastic(
Int.MAX_VALUE, Int.MAX_VALUE, "global-service-changes-combinator"
private val scheduler = Micrometer.timedScheduler(
Schedulers.newBoundedElastic(
Int.MAX_VALUE, Int.MAX_VALUE, "global-service-changes-combinator"
), meterRegistry, "schedulers", Tags.of("name", "global-service-changes-combinator")
)

fun combined(): Flux<MultiClusterState> {
Expand All @@ -43,9 +47,11 @@ class GlobalStateChanges(
.logSuppressedError("combineLatest() suppressed exception")
.measureBuffer("global-service-changes-combine-latest", meterRegistry)
.checkpoint("global-service-changes-emitted")
.tap(Micrometer.metrics(meterRegistry))
.name("global-service-changes-emitted").metrics()
}

// todo
private fun combinedExperimentalFlow(
clusterStatesStreams: List<Flux<MultiClusterState>>
): Flux<MultiClusterState> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.Locality
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Companion.toMultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState
import pl.allegro.tech.servicemesh.envoycontrol.utils.clusterTag
import pl.allegro.tech.servicemesh.envoycontrol.utils.errorsMetricName
import pl.allegro.tech.servicemesh.envoycontrol.utils.operationTag
import pl.allegro.tech.servicemesh.envoycontrol.utils.metricEmitterTag
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
import java.lang.Integer.max
Expand All @@ -30,14 +34,17 @@ class RemoteServices(
fun getChanges(interval: Long): Flux<MultiClusterState> {
val aclFlux: Flux<MultiClusterState> = Flux.create({ sink ->
scheduler.scheduleWithFixedDelay({
meterRegistry.timer("cross.dc.synchronization.seconds", Tags.of("operation", "get-multi-cluster-state"))
meterRegistry.timer(
"cross_dc_synchronization.seconds",
Tags.of(operationTag, "get-multi-cluster-state")
)
.recordCallable {
getChanges(sink::next, interval)
}
}, 0, interval, TimeUnit.SECONDS)
}, FluxSink.OverflowStrategy.LATEST)
return aclFlux.doOnCancel {
meterRegistry.counter("cross.dc.synchronization.cancelled").increment()
meterRegistry.counter("cross_dc_synchronization.cancelled").increment()
logger.warn("Cancelling cross dc sync")
}
}
Expand All @@ -62,8 +69,12 @@ class RemoteServices(
.orTimeout(interval, TimeUnit.SECONDS)
.exceptionally {
meterRegistry.counter(
"cross.dc.synchronization.errors.total",
Tags.of("cluster", cluster, "operation", "get-state")
errorsMetricName,
Tags.of(
clusterTag, cluster,
operationTag, "get-state",
metricEmitterTag, "cross-dc-synchronization"
)
).increment()
logger.warn("Error synchronizing instances ${it.message}", it)
clusterStateCache[cluster]
Expand All @@ -76,8 +87,12 @@ class RemoteServices(
cluster to instances
} catch (e: Exception) {
meterRegistry.counter(
"cross.dc.synchronization.errors.total",
Tags.of("cluster", cluster, "operation", "get-instances")
errorsMetricName,
Tags.of(
clusterTag, cluster,
operationTag, "get-instances",
metricEmitterTag, "cross-dc-synchronization"
)
).increment()
logger.warn("Failed fetching instances from $cluster", e)
cluster to emptyList()
Expand All @@ -89,7 +104,7 @@ class RemoteServices(
state: ServicesState
): ClusterState {
meterRegistry.counter(
"cross.dc.synchronization.total", Tags.of("cluster", cluster)
"cross_dc_synchronization.total", Tags.of(clusterTag, cluster)
)
.increment()
val clusterState = ClusterState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,18 @@ import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.noop.NoopTimer

val noopTimer = NoopTimer(Meter.Id("", Tags.empty(), null, null, Meter.Type.TIMER))
val reactorMetricName = "reactor"
val errorsMetricName = "errors.count"
val connectionsMetricName = "connections"
val requestsMetricName = "requests.total"
val connectionTypeTag = "connection-type"
val streamTypeTag = "stream-type"
val discoveryReqTypeTag = "discovery-request-type"
val metricTypeTag = "metric-type"
val metricEmitterTag = "metric-emitter"
val snapshotStatusTag = "snapshot-status"
val updateTriggerTag = "update-trigger"
val serviceTag = "service"
val operationTag = "operation"
val clusterTag = "cluster"
val statusTag = "status"
Loading

0 comments on commit be4e0cd

Please sign in to comment.