Skip to content

Commit

Permalink
Merge branch 'custom-routes' of github.com:allegro/envoy-control into…
Browse files Browse the repository at this point in the history
… custom-routes
  • Loading branch information
KSmigielski committed Nov 18, 2024
2 parents f60d93e + ee40534 commit 8cc1f4a
Show file tree
Hide file tree
Showing 17 changed files with 156 additions and 109 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@

Lists all changes with user impact.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
## [0.22.3]

## [0.22.5]
### Changed
- Add possibility to create custom routes

## [0.22.4]
- Added possibility for configuring priorities per service

## [0.22.3]
### Changed
- Changed names of some metrics

## [0.22.2]
### Changed
- Migrated metrics to prometheus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import io.envoyproxy.controlplane.cache.XdsRequest
import io.micrometer.core.instrument.MeterRegistry
import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlMetrics
import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHANGE_WATCHER_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.WATCH_TYPE_TAG
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
Expand All @@ -38,7 +38,7 @@ internal class GroupChangeWatcher(
return groupsChanged
.measureBuffer("group-change-watcher", meterRegistry)
.checkpoint("group-change-watcher-emitted")
.name(REACTOR_METRIC)
.name(CHANGE_WATCHER_METRIC)
.tag(WATCH_TYPE_TAG, "group")
.metrics()
.doOnSubscribe {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ package pl.allegro.tech.servicemesh.envoycontrol.server.callbacks

import io.envoyproxy.controlplane.cache.Resources
import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest as V3DiscoveryRequest
import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest as V3DeltaDiscoveryRequest
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tags
import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTION_TYPE_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTIONS_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.CONNECTION_TYPE_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.DISCOVERY_REQ_TYPE_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.REQUESTS_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.STREAM_TYPE_TAG
import java.util.concurrent.atomic.AtomicInteger
import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest as V3DeltaDiscoveryRequest
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest as V3DiscoveryRequest

class MetricsDiscoveryServerCallbacks(private val meterRegistry: MeterRegistry) : DiscoveryServerCallbacks {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class LoadBalancingProperties {
var policy = Cluster.LbPolicy.LEAST_REQUEST
var useKeysSubsetFallbackPolicy = true
var priorities = LoadBalancingPriorityProperties()
var servicePriorities: Map<String, LoadBalancingPriorityProperties> = mapOf()
}

class CanaryProperties {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.XDS
import pl.allegro.tech.servicemesh.envoycontrol.groups.Group
import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.utils.ERRORS_TOTAL_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.COMMUNICATION_MODE_ERROR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.ParallelizableScheduler
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICE_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.SIMPLE_CACHE_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_STATUS_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.STATUS_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_ERROR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_GROUP_ERROR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_UPDATE_DURATION_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.UPDATE_TRIGGER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.doOnNextScheduledOn
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
Expand Down Expand Up @@ -60,12 +62,10 @@ class SnapshotUpdater(
// step 2: only watches groups. if groups change we use the last services state and update those groups
groups().subscribeOn(globalSnapshotScheduler)
)
.measureBuffer("snapshot-updater", meterRegistry, innerSources = 2)
.measureBuffer("snapshot-updater-merged", meterRegistry, innerSources = 2)
.checkpoint("snapshot-updater-merged")
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "snapshot-updater")
.tag(SNAPSHOT_STATUS_TAG, "merged")
.tag(UPDATE_TRIGGER_TAG, "global")
.name(SNAPSHOT_METRIC)
.tag(CHECKPOINT_TAG, "merged")
.metrics()
// step 3: group updates don't provide a snapshot,
// so we piggyback the last updated snapshot state for use
Expand Down Expand Up @@ -101,20 +101,17 @@ class SnapshotUpdater(
// see GroupChangeWatcher
return onGroupAdded
.publishOn(globalSnapshotScheduler)
.measureBuffer("snapshot-updater", meterRegistry)
.measureBuffer("snapshot-updater-groups-published", meterRegistry)
.checkpoint("snapshot-updater-groups-published")
.name(SNAPSHOT_METRIC)
.tag(CHECKPOINT_TAG, "published")
.metrics()
.map { groups ->
UpdateResult(action = Action.SERVICES_GROUP_ADDED, groups = groups)
}
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "snapshot-updater")
.tag(SNAPSHOT_STATUS_TAG, "published")
.tag(UPDATE_TRIGGER_TAG, "groups")
.metrics()
.onErrorResume { e ->
meterRegistry.counter(
ERRORS_TOTAL_METRIC,
Tags.of(UPDATE_TRIGGER_TAG, "groups", METRIC_EMITTER_TAG, "snapshot-updater")
SNAPSHOT_ERROR_METRIC, Tags.of(UPDATE_TRIGGER_TAG, "groups")
)
.increment()
logger.error("Unable to process new group", e)
Expand All @@ -124,17 +121,16 @@ class SnapshotUpdater(

internal fun services(states: Flux<MultiClusterState>): Flux<UpdateResult> {
return states
.name(REACTOR_METRIC)
.tag(UPDATE_TRIGGER_TAG, "services")
.tag(STATUS_TAG, "sampled")
.onBackpressureLatestMeasured("snapshot-updater", meterRegistry)
.name(SERVICES_STATE_METRIC)
.tag(CHECKPOINT_TAG, "sampled")
.metrics()
.onBackpressureLatestMeasured("snapshot-updater-services-sampled", meterRegistry)
// prefetch = 1, instead of default 256, to avoid processing stale states in case of backpressure
.publishOn(globalSnapshotScheduler, 1)
.measureBuffer("snapshot-updater", meterRegistry)
.measureBuffer("snapshot-updater-services-published", meterRegistry)
.checkpoint("snapshot-updater-services-published")
.name(REACTOR_METRIC)
.tag(UPDATE_TRIGGER_TAG, "services")
.tag(STATUS_TAG, "published")
.name(SERVICES_STATE_METRIC)
.tag(CHECKPOINT_TAG, "published")
.metrics()
.createClusterConfigurations()
.map { (states, clusters) ->
Expand Down Expand Up @@ -163,8 +159,7 @@ class SnapshotUpdater(
.filter { it != emptyUpdateResult }
.onErrorResume { e ->
meterRegistry.counter(
ERRORS_TOTAL_METRIC,
Tags.of(METRIC_EMITTER_TAG, "snapshot-updater", UPDATE_TRIGGER_TAG, "services")
SNAPSHOT_ERROR_METRIC, Tags.of(UPDATE_TRIGGER_TAG, "services")
).increment()
logger.error("Unable to process service changes", e)
Mono.justOrEmpty(UpdateResult(action = Action.ERROR_PROCESSING_CHANGES))
Expand All @@ -187,18 +182,17 @@ class SnapshotUpdater(
}
} catch (e: Throwable) {
meterRegistry.counter(
ERRORS_TOTAL_METRIC,
SNAPSHOT_GROUP_ERROR_METRIC,
Tags.of(
SERVICE_TAG, group.serviceName,
OPERATION_TAG, "create-snapshot",
METRIC_EMITTER_TAG, "snapshot-updater"
OPERATION_TAG, "create-snapshot"
)
).increment()
logger.error("Unable to create snapshot for group ${group.serviceName}", e)
}
}

private val updateSnapshotForGroupsTimer = meterRegistry.timer("snapshot.update.duration.seconds")
private val updateSnapshotForGroupsTimer = meterRegistry.timer(SNAPSHOT_UPDATE_DURATION_METRIC)

private fun updateSnapshotForGroups(
groups: Collection<Group>,
Expand All @@ -213,7 +207,7 @@ class SnapshotUpdater(
} else if (result.xdsSnapshot != null && group.communicationMode == XDS) {
updateSnapshotForGroup(group, result.xdsSnapshot)
} else {
meterRegistry.counter(ERRORS_TOTAL_METRIC, Tags.of("type", "communication-mode")).increment()
meterRegistry.counter(COMMUNICATION_MODE_ERROR_METRIC).increment()
logger.error(
"Requested snapshot for ${group.communicationMode.name} mode, but it is not here. " +
"Handling Envoy with not supported communication mode should have been rejected before." +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class EnvoyEndpointsFactory(
?.map {
createLbEndpoint(it, serviceInstances.serviceName, locality)
} ?: emptyList())
.setPriority(toEnvoyPriority(zone, locality))
.setPriority(toEnvoyPriority(zone, locality, serviceInstances))
.build()
}

Expand Down Expand Up @@ -286,8 +286,14 @@ class EnvoyEndpointsFactory(
false -> this
}

private fun toEnvoyPriority(zone: String, locality: Locality): Int {
val zonePriorities = properties.loadBalancing.priorities.zonePriorities
private fun toEnvoyPriority(zone: String, locality: Locality, serviceInstances: ServiceInstances?): Int {
var zonePriorities = properties.loadBalancing.priorities.zonePriorities
serviceInstances?.let {
if (properties.loadBalancing.servicePriorities.containsKey(serviceInstances.serviceName)) {
zonePriorities =
properties.loadBalancing.servicePriorities[serviceInstances.serviceName]!!.zonePriorities
}
}
return when (zonePriorities.isNotEmpty()) {
true -> zonePriorities[currentZone]?.get(zone) ?: toEnvoyPriority(locality)
false -> toEnvoyPriority(locality)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState.Companion.toMultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.logSuppressedError
import pl.allegro.tech.servicemesh.envoycontrol.utils.measureBuffer
import pl.allegro.tech.servicemesh.envoycontrol.utils.onBackpressureLatestMeasured
Expand Down Expand Up @@ -47,8 +46,8 @@ class GlobalStateChanges(
.logSuppressedError("combineLatest() suppressed exception")
.measureBuffer("global-service-changes-combinator", meterRegistry)
.checkpoint("global-service-changes-emitted")
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "global-service-changes-combinator")
.name(SERVICES_STATE_METRIC)
.tag(CHECKPOINT_TAG, "combined")
.metrics()
}

Expand Down Expand Up @@ -76,12 +75,13 @@ class GlobalStateChanges(
.logSuppressedError("combineLatest() suppressed exception")
.measureBuffer("global-service-changes-combine-latest", meterRegistry)
.checkpoint("global-service-changes-emitted")
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "global-service-changes")
.name(SERVICES_STATE_METRIC)
.tag(CHECKPOINT_TAG, "emitted")
.metrics()
.onBackpressureLatestMeasured("global-service-changes-backpressure", meterRegistry)
.publishOn(scheduler, 1)
.checkpoint("global-service-changes-published")
.name(SERVICES_STATE_METRIC)
.tag(CHECKPOINT_TAG, "published")
.metrics()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package pl.allegro.tech.servicemesh.envoycontrol.synchronization
import pl.allegro.tech.servicemesh.envoycontrol.EnvoyControlProperties
import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterStateChanges
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.REACTOR_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.CHECKPOINT_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_METRIC
import reactor.core.publisher.Flux

class RemoteClusterStateChanges(
Expand All @@ -16,7 +16,7 @@ class RemoteClusterStateChanges(
.getChanges(properties.sync.pollingInterval)
.startWith(MultiClusterState.empty())
.distinctUntilChanged()
.name(REACTOR_METRIC)
.tag(METRIC_EMITTER_TAG, "cross-dc-synchronisation")
.name(SERVICES_STATE_METRIC)
.tag(CHECKPOINT_TAG, "cross-dc")
.metrics()
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.CLUSTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.CROSS_DC_SYNC_CANCELLED_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.CROSS_DC_SYNC_SECONDS_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.CROSS_DC_SYNC_TOTAL_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.ERRORS_TOTAL_METRIC
import pl.allegro.tech.servicemesh.envoycontrol.utils.OPERATION_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.METRIC_EMITTER_TAG
import pl.allegro.tech.servicemesh.envoycontrol.utils.SERVICES_STATE_ERRORS_METRIC
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
import java.lang.Integer.max
Expand Down Expand Up @@ -72,11 +71,10 @@ class RemoteServices(
.orTimeout(interval, TimeUnit.SECONDS)
.exceptionally {
meterRegistry.counter(
ERRORS_TOTAL_METRIC,
SERVICES_STATE_ERRORS_METRIC,
Tags.of(
CLUSTER_TAG, cluster,
OPERATION_TAG, "get-state",
METRIC_EMITTER_TAG, "cross-dc-synchronization"
OPERATION_TAG, "get-state"
)
).increment()
logger.warn("Error synchronizing instances ${it.message}", it)
Expand All @@ -90,11 +88,10 @@ class RemoteServices(
cluster to instances
} catch (e: Exception) {
meterRegistry.counter(
ERRORS_TOTAL_METRIC,
SERVICES_STATE_ERRORS_METRIC,
Tags.of(
CLUSTER_TAG, cluster,
OPERATION_TAG, "get-instances",
METRIC_EMITTER_TAG, "cross-dc-synchronization"
OPERATION_TAG, "get-instances"
)
).increment()
logger.warn("Failed fetching instances from $cluster", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,19 @@ import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.noop.NoopTimer

val noopTimer = NoopTimer(Meter.Id("", Tags.empty(), null, null, Meter.Type.TIMER))
const val REACTOR_METRIC = "reactor"
const val ERRORS_TOTAL_METRIC = "errors.total"
const val CONNECTIONS_METRIC = "connections"
const val REQUESTS_METRIC = "requests.total"
const val WATCH_METRIC = "watch"
const val REACTOR_METRIC = "reactor.stream.stats"
const val REACTOR_DISCARDED_METRIC = "reactor.stream.discarded"
const val SERVICES_STATE_METRIC = "services.state"
const val SERVICES_STATE_ERRORS_METRIC = "services.state.errors.total"
const val SNAPSHOT_METRIC = "snapshot"
const val SNAPSHOT_UPDATE_DURATION_METRIC = "snapshot.update.duration.seconds"
const val SNAPSHOT_ERROR_METRIC = "snapshot.errors"
const val SNAPSHOT_GROUP_ERROR_METRIC = "snapshot.group.errors.total"
const val COMMUNICATION_MODE_ERROR_METRIC = "communication.errors.total"
const val CONNECTIONS_METRIC = "connection.stats"
const val REQUESTS_METRIC = "request.stats"
const val WATCH_ERRORS_METRIC = "services.watch.errors.total"
const val WATCH_METRIC = "services.watch"
const val ENVOY_CONTROL_WARM_UP_METRIC = "envoy.control.warmup.seconds"
const val CROSS_DC_SYNC_METRIC = "cross.dc.synchronization"
const val CROSS_DC_SYNC_CANCELLED_METRIC = "$CROSS_DC_SYNC_METRIC.cancelled.total"
Expand All @@ -19,6 +27,7 @@ const val SIMPLE_CACHE_METRIC = "simple.cache.duration.seconds"
const val PROTOBUF_CACHE_METRIC = "protobuf.cache.serialize.time"
const val CACHE_GROUP_COUNT_METRIC = "cache.groups.count"
const val SNAPSHOT_FACTORY_SECONDS_METRIC = "snapshot.factory.seconds"
const val CHANGE_WATCHER_METRIC = "group.change.watcher"

const val CONNECTION_TYPE_TAG = "connection-type"
const val STREAM_TYPE_TAG = "stream-type"
Expand All @@ -27,7 +36,6 @@ const val WATCH_TYPE_TAG = "watch-type"
const val DISCOVERY_REQ_TYPE_TAG = "discovery-request-type"
const val METRIC_TYPE_TAG = "metric-type"
const val METRIC_EMITTER_TAG = "metric-emitter"
const val SNAPSHOT_STATUS_TAG = "snapshot-status"
const val UPDATE_TRIGGER_TAG = "update-trigger"
const val SERVICE_TAG = "service"
const val OPERATION_TAG = "operation"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ fun <T> Flux<T>.measureBuffer(
fun <T> Flux<T>.measureDiscardedItems(name: String, meterRegistry: MeterRegistry): Flux<T> = this
.doOnDiscard(Any::class.java) {
meterRegistry.counter(
REACTOR_METRIC,
METRIC_TYPE_TAG, "discarded-items",
REACTOR_DISCARDED_METRIC,
METRIC_EMITTER_TAG, name
).increment()
}
Expand Down
Loading

0 comments on commit 8cc1f4a

Please sign in to comment.