diff --git a/build.gradle b/build.gradle index dd88f1c24..f93fbde2e 100644 --- a/build.gradle +++ b/build.gradle @@ -55,7 +55,8 @@ allprojects { bytebuddy : '1.15.1', re2j : '1.3', xxhash : '0.10.1', - dropwizard : '4.2.26' + dropwizard : '4.2.26', + reactor_core_micrometer: '1.0.6' ] dependencyManagement { diff --git a/envoy-control-core/build.gradle b/envoy-control-core/build.gradle index 6d4295acf..b92d7dbe0 100644 --- a/envoy-control-core/build.gradle +++ b/envoy-control-core/build.gradle @@ -7,6 +7,8 @@ dependencies { implementation group: 'org.jetbrains.kotlin', name: 'kotlin-reflect' api group: 'io.dropwizard.metrics', name: 'metrics-core', version: versions.dropwizard api group: 'io.micrometer', name: 'micrometer-core' + implementation group: 'io.projectreactor', name: 'reactor-core-micrometer', version: versions.reactor_core_micrometer + implementation group: 'com.google.re2j', name: 're2j', version: versions.re2j api group: 'io.envoyproxy.controlplane', name: 'server', version: versions.java_controlplane diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt index 522d96eae..65ab9512d 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/groups/GroupChangeWatcher.kt @@ -11,6 +11,7 @@ import io.micrometer.core.instrument.MeterRegistry import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlMetrics import pl.allegro.tech.servicemesh.envoycontrol.logger import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer +import reactor.core.observability.micrometer.Micrometer import reactor.core.publisher.Flux import reactor.core.publisher.FluxSink import java.util.function.Consumer @@ -34,9 +35,13 @@ internal class GroupChangeWatcher( fun onGroupAdded(): Flux> { return groupsChanged - .measureBuffer("group-change-watcher-emitted", meterRegistry) + .measureBuffer("group-change-watcher", meterRegistry) .checkpoint("group-change-watcher-emitted") - .name("group-change-watcher-emitted").metrics() + .name("group_change_watcher") + .tap(Micrometer.metrics(meterRegistry)) + .doOnSubscribe { + logger.info("Watching group changes") + } .doOnCancel { logger.warn("Cancelling watching group changes") } diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/CachedProtoResourcesSerializer.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/CachedProtoResourcesSerializer.kt index 3503b70a4..7664c41fc 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/CachedProtoResourcesSerializer.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/CachedProtoResourcesSerializer.kt @@ -27,7 +27,7 @@ internal class CachedProtoResourcesSerializer( } private val cache: Cache = createCache("protobuf-cache") - private val timer = createTimer(reportMetrics, meterRegistry, "protobuf-cache.serialize.time") + private val timer = createTimer(reportMetrics, meterRegistry, "protobuf_cache.serialize.time") private fun createCache(cacheName: String): Cache { return if (reportMetrics) { diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt index 6b1ca0ccd..cd9d43e12 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/server/callbacks/MetricsDiscoveryServerCallbacks.kt @@ -1,12 +1,16 @@ package pl.allegro.tech.servicemesh.envoycontrol.server.callbacks -import com.google.common.net.InetAddresses.increment import io.envoyproxy.controlplane.cache.Resources import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest as V3DiscoveryRequest import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest as V3DeltaDiscoveryRequest import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.Tags +import pl.allegro.tech.servicemesh.envoycontrol.utils.connectionTypeTag +import pl.allegro.tech.servicemesh.envoycontrol.utils.connectionsMetricName +import pl.allegro.tech.servicemesh.envoycontrol.utils.discoveryReqTypeTag +import pl.allegro.tech.servicemesh.envoycontrol.utils.requestsMetricName +import pl.allegro.tech.servicemesh.envoycontrol.utils.streamTypeTag import java.util.concurrent.atomic.AtomicInteger class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) : DiscoveryServerCallbacks { @@ -38,8 +42,8 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) connectionsByType.forEach { (type, typeConnections) -> meterRegistry.gauge( - "connections", - Tags.of("connection-type", "grpc", "stream-type", type.name.lowercase()), + connectionsMetricName, + Tags.of(connectionTypeTag, "grpc", streamTypeTag, type.name.lowercase()), typeConnections ) } @@ -57,11 +61,11 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) override fun onV3StreamRequest(streamId: Long, request: V3DiscoveryRequest) { meterRegistry.counter( - "requests.total", + requestsMetricName, Tags.of( - "connection-type", "grpc", - "stream-type", StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), - "discovery-request-type", "total" + connectionTypeTag, "grpc", + streamTypeTag, StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), + discoveryReqTypeTag, "total" ) ) .increment() @@ -72,10 +76,11 @@ class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) request: V3DeltaDiscoveryRequest ) { meterRegistry.counter( - "requests.total", + requestsMetricName, Tags.of( - "connection-type", "grpc", - "stream-type", StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), "discovery-request-type", "delta" + connectionTypeTag, "grpc", + streamTypeTag, StreamType.fromTypeUrl(request.typeUrl).name.lowercase(), + discoveryReqTypeTag, "delta" ) ) .increment() diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt index 0778633d4..61b3cbc9a 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdater.kt @@ -15,6 +15,15 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.doOnNextScheduledOn import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer import pl.allegro.tech.servicemesh.envoycontrol.utils.noopTimer import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured +import pl.allegro.tech.servicemesh.envoycontrol.utils.reactorMetricName +import pl.allegro.tech.servicemesh.envoycontrol.utils.errorsMetricName +import pl.allegro.tech.servicemesh.envoycontrol.utils.operationTag +import pl.allegro.tech.servicemesh.envoycontrol.utils.metricEmitterTag +import pl.allegro.tech.servicemesh.envoycontrol.utils.serviceTag +import pl.allegro.tech.servicemesh.envoycontrol.utils.snapshotStatusTag +import pl.allegro.tech.servicemesh.envoycontrol.utils.statusTag +import pl.allegro.tech.servicemesh.envoycontrol.utils.updateTriggerTag +import reactor.core.observability.micrometer.Micrometer import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.scheduler.Scheduler @@ -51,12 +60,13 @@ class SnapshotUpdater( // step 2: only watches groups. if groups change we use the last services state and update those groups groups().subscribeOn(globalSnapshotScheduler) ) - .measureBuffer("snapshot.updater.count.total", meterRegistry, innerSources = 2) + .measureBuffer("snapshot-updater", meterRegistry, innerSources = 2) .checkpoint("snapshot-updater-merged") - .name("snapshot.updater.count.total") - .tag("status", "merged") - .tag("type", "global") - .metrics() + .name(reactorMetricName) + .tag(metricEmitterTag, "snapshot-updater") + .tag(snapshotStatusTag, "merged") + .tag(updateTriggerTag, "global") + .tap(Micrometer.metrics(meterRegistry)) // step 3: group updates don't provide a snapshot, // so we piggyback the last updated snapshot state for use .scan { previous: UpdateResult, newUpdate: UpdateResult -> @@ -91,18 +101,20 @@ class SnapshotUpdater( // see GroupChangeWatcher return onGroupAdded .publishOn(globalSnapshotScheduler) - .measureBuffer("snapshot.updater.count.total", meterRegistry) + .measureBuffer("snapshot-updater", meterRegistry) .checkpoint("snapshot-updater-groups-published") - .name("snapshot.updater.count.total") - .tag("type", "groups") - .tag("status", "published").metrics() .map { groups -> UpdateResult(action = Action.SERVICES_GROUP_ADDED, groups = groups) } + .name(reactorMetricName) + .tag(metricEmitterTag, "snapshot-updater") + .tag(snapshotStatusTag, "published") + .tag(updateTriggerTag, "groups") + .tap(Micrometer.metrics(meterRegistry)) .onErrorResume { e -> meterRegistry.counter( - "snapshot.updater.errors.total", - Tags.of("type", "groups") + errorsMetricName, + Tags.of(updateTriggerTag, "groups", metricEmitterTag, "snapshot-updater") ) .increment() logger.error("Unable to process new group", e) @@ -112,19 +124,19 @@ class SnapshotUpdater( internal fun services(states: Flux): Flux { return states - .name("snapshot.updater.count.total") - .tag("type", "services") - .tag("status", "sampled") - .metrics() - .onBackpressureLatestMeasured("snapshot.updater.count.total", meterRegistry) + .name(reactorMetricName) + .tag(updateTriggerTag, "services") + .tag(statusTag, "sampled") + .tap(Micrometer.metrics(meterRegistry)) + .onBackpressureLatestMeasured("snapshot-updater", meterRegistry) // prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure .publishOn(globalSnapshotScheduler, 1) - .measureBuffer("snapshot.updater.count.total", meterRegistry) + .measureBuffer("snapshot-updater", meterRegistry) .checkpoint("snapshot-updater-services-published") - .name("snapshot.updater.count.total") - .tag("type", "services") - .tag("status", "published") - .metrics() + .name(reactorMetricName) + .tag(updateTriggerTag, "services") + .tag(statusTag, "published") + .tap(Micrometer.metrics(meterRegistry)) .createClusterConfigurations() .map { (states, clusters) -> var lastXdsSnapshot: GlobalSnapshot? = null @@ -152,8 +164,8 @@ class SnapshotUpdater( .filter { it != emptyUpdateResult } .onErrorResume { e -> meterRegistry.counter( - "snapshot.updater.errors.total", - Tags.of("type", "services") + errorsMetricName, + Tags.of(metricEmitterTag, "snapshot-updater", updateTriggerTag, "services") ).increment() logger.error("Unable to process service changes", e) Mono.justOrEmpty(UpdateResult(action = Action.ERROR_PROCESSING_CHANGES)) @@ -162,7 +174,7 @@ class SnapshotUpdater( private fun snapshotTimer(serviceName: String) = if (properties.metrics.cacheSetSnapshot) { meterRegistry.timer( - "simple.cache.duration.seconds", Tags.of("service", serviceName, "operation", "set-snapshot") + "simple.cache.duration.seconds", Tags.of(serviceTag, serviceName, operationTag, "set-snapshot") ) } else { noopTimer @@ -176,14 +188,13 @@ class SnapshotUpdater( } } catch (e: Throwable) { meterRegistry.counter( - "snapshot.updater.errors.total", Tags.of("service", group.serviceName) + errorsMetricName, Tags.of(serviceTag, group.serviceName, operationTag, "create-snapshot") ).increment() logger.error("Unable to create snapshot for group ${group.serviceName}", e) } } - private val updateSnapshotForGroupsTimer = - meterRegistry.timer("snapshot.updater.duration.seconds", Tags.of("type", "groups")) + private val updateSnapshotForGroupsTimer = meterRegistry.timer("snapshot.update.duration.seconds") private fun updateSnapshotForGroups( groups: Collection, @@ -198,8 +209,7 @@ class SnapshotUpdater( } else if (result.xdsSnapshot != null && group.communicationMode == XDS) { updateSnapshotForGroup(group, result.xdsSnapshot) } else { - meterRegistry.counter("snapshot.updater.errors.total", Tags.of("type", "communication-mode")) - .increment() + meterRegistry.counter(errorsMetricName, Tags.of("type", "communication-mode")).increment() logger.error( "Requested snapshot for ${group.communicationMode.name} mode, but it is not here. " + "Handling Envoy with not supported communication mode should have been rejected before." + diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt index 4a99ad39a..91e524135 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/GlobalStateChanges.kt @@ -1,12 +1,14 @@ package pl.allegro.tech.servicemesh.envoycontrol.synchronization import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Companion.toMultiClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges import pl.allegro.tech.servicemesh.envoycontrol.utils.logSuppressedError import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured +import reactor.core.observability.micrometer.Micrometer import reactor.core.publisher.Flux import reactor.core.scheduler.Schedulers @@ -15,8 +17,10 @@ class GlobalStateChanges( private val meterRegistry: MeterRegistry, private val properties: SyncProperties ) { - private val scheduler = Schedulers.newBoundedElastic( - Int.MAX_VALUE, Int.MAX_VALUE, "global-service-changes-combinator" + private val scheduler = Micrometer.timedScheduler( + Schedulers.newBoundedElastic( + Int.MAX_VALUE, Int.MAX_VALUE, "global-service-changes-combinator" + ), meterRegistry, "schedulers", Tags.of("name", "global-service-changes-combinator") ) fun combined(): Flux { @@ -43,9 +47,11 @@ class GlobalStateChanges( .logSuppressedError("combineLatest() suppressed exception") .measureBuffer("global-service-changes-combine-latest", meterRegistry) .checkpoint("global-service-changes-emitted") + .tap(Micrometer.metrics(meterRegistry)) .name("global-service-changes-emitted").metrics() } + // todo private fun combinedExperimentalFlow( clusterStatesStreams: List> ): Flux { diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt index 6a3492a3e..1dc508495 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/synchronization/RemoteServices.kt @@ -8,6 +8,10 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.Locality import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Companion.toMultiClusterState import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState +import pl.allegro.tech.servicemesh.envoycontrol.utils.clusterTag +import pl.allegro.tech.servicemesh.envoycontrol.utils.errorsMetricName +import pl.allegro.tech.servicemesh.envoycontrol.utils.operationTag +import pl.allegro.tech.servicemesh.envoycontrol.utils.metricEmitterTag import reactor.core.publisher.Flux import reactor.core.publisher.FluxSink import java.lang.Integer.max @@ -30,14 +34,17 @@ class RemoteServices( fun getChanges(interval: Long): Flux { val aclFlux: Flux = Flux.create({ sink -> scheduler.scheduleWithFixedDelay({ - meterRegistry.timer("cross.dc.synchronization.seconds", Tags.of("operation", "get-multi-cluster-state")) + meterRegistry.timer( + "cross_dc_synchronization.seconds", + Tags.of(operationTag, "get-multi-cluster-state") + ) .recordCallable { getChanges(sink::next, interval) } }, 0, interval, TimeUnit.SECONDS) }, FluxSink.OverflowStrategy.LATEST) return aclFlux.doOnCancel { - meterRegistry.counter("cross.dc.synchronization.cancelled").increment() + meterRegistry.counter("cross_dc_synchronization.cancelled").increment() logger.warn("Cancelling cross dc sync") } } @@ -62,8 +69,12 @@ class RemoteServices( .orTimeout(interval, TimeUnit.SECONDS) .exceptionally { meterRegistry.counter( - "cross.dc.synchronization.errors.total", - Tags.of("cluster", cluster, "operation", "get-state") + errorsMetricName, + Tags.of( + clusterTag, cluster, + operationTag, "get-state", + metricEmitterTag, "cross-dc-synchronization" + ) ).increment() logger.warn("Error synchronizing instances ${it.message}", it) clusterStateCache[cluster] @@ -76,8 +87,12 @@ class RemoteServices( cluster to instances } catch (e: Exception) { meterRegistry.counter( - "cross.dc.synchronization.errors.total", - Tags.of("cluster", cluster, "operation", "get-instances") + errorsMetricName, + Tags.of( + clusterTag, cluster, + operationTag, "get-instances", + metricEmitterTag, "cross-dc-synchronization" + ) ).increment() logger.warn("Failed fetching instances from $cluster", e) cluster to emptyList() @@ -89,7 +104,7 @@ class RemoteServices( state: ServicesState ): ClusterState { meterRegistry.counter( - "cross.dc.synchronization.total", Tags.of("cluster", cluster) + "cross_dc_synchronization.total", Tags.of(clusterTag, cluster) ) .increment() val clusterState = ClusterState( diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt index ce8f380d9..43b08f3e4 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/Metrics.kt @@ -5,3 +5,18 @@ import io.micrometer.core.instrument.Tags import io.micrometer.core.instrument.noop.NoopTimer val noopTimer = NoopTimer(Meter.Id("", Tags.empty(), null, null, Meter.Type.TIMER)) +val reactorMetricName = "reactor" +val errorsMetricName = "errors.count" +val connectionsMetricName = "connections" +val requestsMetricName = "requests.total" +val connectionTypeTag = "connection-type" +val streamTypeTag = "stream-type" +val discoveryReqTypeTag = "discovery-request-type" +val metricTypeTag = "metric-type" +val metricEmitterTag = "metric-emitter" +val snapshotStatusTag = "snapshot-status" +val updateTriggerTag = "update-trigger" +val serviceTag = "service" +val operationTag = "operation" +val clusterTag = "cluster" +val statusTag = "status" diff --git a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt index 00fccdc2d..1546645c3 100644 --- a/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt +++ b/envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/utils/ReactorUtils.kt @@ -1,6 +1,7 @@ package pl.allegro.tech.servicemesh.envoycontrol.utils import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tags import org.reactivestreams.Subscription import org.slf4j.LoggerFactory import reactor.core.Disposable @@ -49,7 +50,15 @@ fun Flux.measureBuffer( * operator and calculate difference between them */ fun Flux.measureDiscardedItems(name: String, meterRegistry: MeterRegistry): Flux = this - .doOnDiscard(Any::class.java) { meterRegistry.counter("reactor-discarded-items.$name").increment() } + .doOnDiscard(Any::class.java) { + meterRegistry.counter( + reactorMetricName, + metricTypeTag, + "discarded-items", + metricEmitterTag, + name + ).increment() + } fun Flux.onBackpressureLatestMeasured(name: String, meterRegistry: MeterRegistry): Flux = measureDiscardedItems("$name-before", meterRegistry) @@ -105,7 +114,12 @@ private fun measureQueueSubscriptionBuffer( name: String, meterRegistry: MeterRegistry ) { - meterRegistry.gauge(bufferMetric(name), subscription, queueSubscriptionBufferExtractor) + meterRegistry.gauge( + reactorMetricName, + Tags.of(metricTypeTag, "buffer-size", metricEmitterTag, name), + subscription, + queueSubscriptionBufferExtractor + ) } private fun measureScannableBuffer( @@ -116,12 +130,19 @@ private fun measureScannableBuffer( ) { val buffered = scannable.scan(Scannable.Attr.BUFFERED) if (buffered == null) { - logger.error("Cannot register metric '${bufferMetric(name)}'. Buffer size not available. " + - "Use measureBuffer() only on supported reactor operators") + logger.error( + "Cannot register metric $reactorMetricName 'with ${metricEmitterTag}: $name'. Buffer size not available. " + + "Use measureBuffer() only on supported reactor operators" + ) return } - meterRegistry.gauge(bufferMetric(name), scannable, scannableBufferExtractor) + meterRegistry.gauge( + reactorMetricName, + Tags.of(metricTypeTag, "buffer-size", metricEmitterTag, name), + scannable, + scannableBufferExtractor + ) /** * Special case for FlatMap derived operators like merge(). The main buffer attribute doesn't return actual @@ -131,7 +152,12 @@ private fun measureScannableBuffer( * be available, so it must be stated explicitly as innerSources parameter. */ for (i in 0 until innerSources) { - meterRegistry.gauge("${bufferMetric(name)}_$i", scannable, innerBufferExtractor(i)) + meterRegistry.gauge( + reactorMetricName, + Tags.of(metricTypeTag, "buffer-size", "${(name)}_$i"), + scannable, + innerBufferExtractor(i) + ) } } @@ -142,9 +168,8 @@ private fun innerBufferExtractor(index: Int) = { s: Scannable -> ?.let(scannableBufferExtractor) ?: -1.0 } -private val queueSubscriptionBufferExtractor = { s: Fuseable.QueueSubscription<*> -> s.size.toDouble() } -private fun bufferMetric(name: String) = "reactor-buffers.$name" +private val queueSubscriptionBufferExtractor = { s: Fuseable.QueueSubscription<*> -> s.size.toDouble() } sealed class ParallelizableScheduler object DirectScheduler : ParallelizableScheduler() @@ -160,6 +185,7 @@ fun Flux.doOnNextScheduledOn( is DirectScheduler -> { doOnNext(doOnNext) } + is ParallelScheduler -> { this.parallel(scheduler.parallelism) .runOn(scheduler.scheduler) diff --git a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt index 1d3fd3935..a890ee2b9 100644 --- a/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt +++ b/envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/SnapshotUpdaterTest.kt @@ -60,6 +60,9 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.DirectScheduler import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelScheduler import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler import pl.allegro.tech.servicemesh.envoycontrol.utils.any +import pl.allegro.tech.servicemesh.envoycontrol.utils.errorsMetricName +import pl.allegro.tech.servicemesh.envoycontrol.utils.metricEmitterTag +import pl.allegro.tech.servicemesh.envoycontrol.utils.serviceTag import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.scheduler.Schedulers @@ -469,8 +472,8 @@ class SnapshotUpdaterTest { val snapshot = cache.getSnapshot(servicesGroup) assertThat(snapshot).isEqualTo(null) assertThat( - simpleMeterRegistry.find("snapshot.updater.errors.total") - .tags(Tags.of("service", "example-service")) + simpleMeterRegistry.find(errorsMetricName) + .tags(Tags.of(serviceTag, "example-service", metricEmitterTag, "snapshot-updater")) .counter()?.count() ).isEqualTo(1.0) } diff --git a/envoy-control-services/build.gradle b/envoy-control-services/build.gradle index 724d8e4a2..ffb109fc7 100644 --- a/envoy-control-services/build.gradle +++ b/envoy-control-services/build.gradle @@ -1,4 +1,5 @@ dependencies { implementation group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib' api group: 'io.projectreactor', name: 'reactor-core' + api group: 'io.projectreactor', name: 'reactor-core-micrometer', version: versions.reactor_core_micrometer } diff --git a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt index ea54ec40f..9ad0822d9 100644 --- a/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt +++ b/envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/MetricsDiscoveryServerCallbacksTest.kt @@ -20,6 +20,11 @@ import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscover import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.RDS import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.SDS import pl.allegro.tech.servicemesh.envoycontrol.server.callbacks.MetricsDiscoveryServerCallbacks.StreamType.UNKNOWN +import pl.allegro.tech.servicemesh.envoycontrol.utils.connectionTypeTag +import pl.allegro.tech.servicemesh.envoycontrol.utils.connectionsMetricName +import pl.allegro.tech.servicemesh.envoycontrol.utils.discoveryReqTypeTag +import pl.allegro.tech.servicemesh.envoycontrol.utils.requestsMetricName +import pl.allegro.tech.servicemesh.envoycontrol.utils.streamTypeTag import java.util.function.Consumer import java.util.function.Predicate @@ -228,18 +233,20 @@ interface MetricsDiscoveryServerCallbacksTest { // given val meterRegistry = envoyControl().app.meterRegistry() consul().server.operations.registerService(service(), name = "echo") - + for (meter in meterRegistry.meters) { + print(meter.toString()) + } // expect untilAsserted { expectedGrpcConnectionsGaugeValues().forEach { (type, value) -> - val metric = "connections" + val metric = connectionsMetricName assertThat( meterRegistry.find(metric) - .tags(Tags.of("stream-type", type.name.lowercase(), "connection-type", "grpc")).gauge() + .tags(Tags.of(streamTypeTag, type.name.lowercase(), connectionTypeTag, "grpc")).gauge() ).isNotNull assertThat( meterRegistry.get(metric) - .tags(Tags.of("stream-type", type.name.lowercase(), "connection-type", "grpc")).gauge().value() + .tags(Tags.of(streamTypeTag, type.name.lowercase(), connectionTypeTag, "grpc")).gauge().value() .toInt() ).isEqualTo(value) } @@ -261,8 +268,8 @@ interface MetricsDiscoveryServerCallbacksTest { private fun assertCondition(type: String, condition: Predicate, reqTpe: String) { val counterValue = - envoyControl().app.meterRegistry().find("requests.total") - .tags(Tags.of("stream-type", type, "discovery-request-type", reqTpe, "connection-type", "grpc")) + envoyControl().app.meterRegistry().find(requestsMetricName) + .tags(Tags.of(streamTypeTag, type, discoveryReqTypeTag, reqTpe, connectionTypeTag, "grpc")) .counter()?.count()?.toInt() logger.info("$type $counterValue") assertThat(counterValue).satisfies(Consumer { condition.test(it) })