From 07cd6cdf9f462dad1ec9c0a35d2862082ba2763c Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Wed, 22 Dec 2021 15:07:44 -0800 Subject: [PATCH] onComplete schedules next ask --- .../filodb.coordinator/ShardManager.scala | 6 +- .../TenantIngestionMetering.scala | 158 +++++++++--------- 2 files changed, 79 insertions(+), 85 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala b/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala index 59ac730b9f..bea562d0a0 100644 --- a/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala +++ b/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala @@ -39,12 +39,10 @@ private[coordinator] final class ShardManager(settings: FilodbSettings, private val _tenantIngestionMeteringOpt = if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) { - val inst = new TenantIngestionMetering( + Some(new TenantIngestionMetering( settings, () => { _datasetInfo.map{ case (dsRef, _) => dsRef}.toIterator }, - () => { _coordinators.head._2 }) - inst.schedulePeriodicPublishJob() - Some(inst) + () => { _coordinators.head._2 })) } else None val shardReassignmentMinInterval = settings.config.getDuration("shard-manager.reassignment-min-interval") diff --git a/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala index d88031b91d..7399af06d1 100644 --- a/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala +++ b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala @@ -1,7 +1,6 @@ package filodb.coordinator import java.util.concurrent.{TimeoutException, TimeUnit} -import java.util.concurrent.locks.ReentrantReadWriteLock import scala.concurrent.duration.FiniteDuration import scala.util.{Failure, Success} @@ -18,24 +17,28 @@ import filodb.core.DatasetRef import filodb.query.{QueryError, QueryResult, TsCardinalities} object QueryThrottle { - // currently just add this diff to the delay if the timeout rate exceeds THRESHOLD - val DELAY_DIFF = FiniteDuration(5L, TimeUnit.MINUTES) + // currently just add this diff to the interval if the timeout rate exceeds THRESHOLD + protected val INTERVAL_DIFF = FiniteDuration(5L, TimeUnit.MINUTES) // number of past query timeouts/non-timeouts to consider - val LOOKBACK = 10 - // {non-timeouts-in-lookback-window} / LOOKBACK < THRESHOLD will adjust the delay - val THRESHOLD = 0.85 + protected val LOOKBACK = 10 + // {non-timeouts-in-lookback-window} / LOOKBACK < THRESHOLD will adjust the interval + protected val THRESHOLD = 0.85 +} + +object TenantIngestionMetering { + protected val METRIC_ACTIVE = "active_timeseries_by_tenant" + protected val METRIC_TOTAL = "total_timeseries_by_tenant" } /** * Throttles the TenantIngestionMetering query rate according to the ratio of timeouts to non-timeouts. * - * @param queryDelay the initial delay between each query. This is the duration to be adjusted. + * @param queryInterval the initial delay between each query. This is the duration to be adjusted. */ -class QueryThrottle(queryDelay: FiniteDuration) extends StrictLogging { +class QueryThrottle(queryInterval: FiniteDuration) extends StrictLogging { import QueryThrottle._ - private var delay: FiniteDuration = queryDelay.copy() - private val lock = new ReentrantReadWriteLock() + private var interval: FiniteDuration = queryInterval.copy() // these track timeouts for the past LOOKBACK queries private var bits = (1 << LOOKBACK) - 1 @@ -55,13 +58,13 @@ class QueryThrottle(queryDelay: FiniteDuration) extends StrictLogging { } /** - * Updates the delay according to the timeout:non-timeoout ratio. + * Updates the interval according to the timeout:non-timeout ratio. */ - private def updateDelay(): Unit = { + private def updateInterval(): Unit = { val successRate = Integer.bitCount(bits).toDouble / LOOKBACK if (successRate < THRESHOLD) { - delay = delay + DELAY_DIFF - logger.info("too many timeouts; query delay extended to " + delay.toString()) + interval = interval + INTERVAL_DIFF + logger.info("too many timeouts; query interval extended to " + interval.toString()) // reset the bits bits = (1 << LOOKBACK) - 1 } @@ -71,30 +74,23 @@ class QueryThrottle(queryDelay: FiniteDuration) extends StrictLogging { * Record a query timeout. */ def recordTimeout(): Unit = { - lock.writeLock().lock() setNextBit(false) - updateDelay() - lock.writeLock().unlock() + updateInterval() } /** * Record a query non-timeout. */ def recordOnTime(): Unit = { - lock.writeLock().lock() setNextBit(true) - updateDelay() - lock.writeLock().unlock() + updateInterval() } /** - * Returns the current query delay. + * Returns the current query interval. */ - def getDelay(): FiniteDuration = { - lock.readLock().lock() - val currDelay = delay.copy() - lock.readLock().unlock() - return currDelay + def getInterval(): FiniteDuration = { + interval.copy() } } @@ -110,31 +106,20 @@ class QueryThrottle(queryDelay: FiniteDuration) extends StrictLogging { * queried in the order they're returned from this function. */ class TenantIngestionMetering(settings: FilodbSettings, - dsIterProducer: () => Iterator[DatasetRef], - coordActorProducer: () => ActorRef) extends StrictLogging{ + dsIterProducer: () => Iterator[DatasetRef], + coordActorProducer: () => ActorRef) extends StrictLogging{ + import TenantIngestionMetering._ - // time until first query executes - private val SCHED_INIT_DELAY = FiniteDuration( + private val clusterType = settings.config.getString("cluster-type") + private var queryAskTimeSec = -1L // unix time of the most recent query ask + private val queryThrottle = new QueryThrottle(FiniteDuration( settings.config.getDuration("metering-query-interval").toSeconds, - TimeUnit.SECONDS) - - private val CLUSTER_TYPE = settings.config.getString("cluster-type") - - private val METRIC_ACTIVE = "active_timeseries_by_tenant" - private val METRIC_TOTAL = "total_timeseries_by_tenant" - - private val queryLimiter = new QueryThrottle(SCHED_INIT_DELAY) + TimeUnit.SECONDS)) - def schedulePeriodicPublishJob() : Unit = { - // NOTE: the FiniteDuration overload of scheduleWithFixedDelay - // does not work. Unsure why, but that's why these FiniteDurations are - // awkwardly parsed into seconds. - scheduler.scheduleOnce( - SCHED_INIT_DELAY.toSeconds, - TimeUnit.SECONDS, - () => queryAndSchedule()) - } + // immediately begin periodically querying for / publishing cardinality data + queryAndSchedule() + // scalastyle:off method.length /** * For each dataset, ask a Coordinator with a TsCardinalities LogicalPlan. * Schedules: @@ -143,47 +128,58 @@ class TenantIngestionMetering(settings: FilodbSettings, */ private def queryAndSchedule() : Unit = { import filodb.query.exec.TsCardExec._ + val groupDepth = 1 // group cardinalities at the second level (i.e. ws & ns) val prefix = Nil // query for cardinalities regardless of first-level name (i.e. ws name) + + // use this later to find total elapsed time + queryAskTimeSec = java.time.Clock.systemUTC().instant().getEpochSecond + dsIterProducer().foreach { dsRef => val fut = Client.asyncAsk( coordActorProducer(), LogicalPlan2Query(dsRef, TsCardinalities(prefix, groupDepth)), - queryLimiter.getDelay()) - fut.onComplete { - case Success(qresp) => - queryLimiter.recordOnTime() - qresp match { - case QueryResult(_, _, rv, _, _, _) => - rv.foreach(_.rows().foreach{ rr => - // publish a cardinality metric for each namespace - val data = RowData.fromRowReader(rr) - val prefix = data.group.toString.split(PREFIX_DELIM) - val tags = Map("metric_ws" -> prefix(0), - "metric_ns" -> prefix(1), - "dataset" -> dsRef.dataset, - "cluster_type" -> CLUSTER_TYPE) - Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble) - Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble) - }) - case QueryError(_, _, t) => logger.warn("QueryError: " + t.getMessage) - } - case Failure(t) => - logger.warn("Failure: " + t.getMessage) - if (t.isInstanceOf[TimeoutException]) { - queryLimiter.recordTimeout() - } else { - queryLimiter.recordOnTime() - } - // required to compile - case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut) + queryThrottle.getInterval()) + + fut.onComplete { tryRes => + tryRes match { + case Success(qresp) => + queryThrottle.recordOnTime() + qresp match { + case QueryResult(_, _, rv, _, _, _) => + rv.foreach(_.rows().foreach{ rr => + // publish a cardinality metric for each namespace + val data = RowData.fromRowReader(rr) + val prefix = data.group.toString.split(PREFIX_DELIM) + val tags = Map( + "metric_ws" -> prefix(0), + "metric_ns" -> prefix(1), + "dataset" -> dsRef.dataset, + "cluster_type" -> clusterType) + Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble) + Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble) + }) + case QueryError(_, _, t) => logger.warn("QueryError: " + t.getMessage) + } + case Failure(t) => + logger.warn("Failure: " + t.getMessage) + if (t.isInstanceOf[TimeoutException]) { + queryThrottle.recordTimeout() + } else { + queryThrottle.recordOnTime() + } + // required to compile + case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut) + } + + // Delay the next query until the beginning of the next interval. + val elapsedSec = java.time.Clock.systemUTC().instant().getEpochSecond - queryAskTimeSec + scheduler.scheduleOnce( + math.max(0, queryThrottle.getInterval().toSeconds - elapsedSec), + TimeUnit.SECONDS, + () => queryAndSchedule()) } } - - // schedule the next query - scheduler.scheduleOnce( - queryLimiter.getDelay().toSeconds, - TimeUnit.SECONDS, - () => queryAndSchedule()) } + // scalastyle:on method.length }