From f76b59c5871f0bd840b9cfcb75cca229684929dc Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Tue, 14 Dec 2021 13:38:00 -0800 Subject: [PATCH] add TIM query throttle --- .../filodb.coordinator/ShardManager.scala | 2 +- .../TenantIngestionMetering.scala | 146 +++++++++++++++--- 2 files changed, 122 insertions(+), 26 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala b/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala index 119ceea7d8..59ac730b9f 100644 --- a/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala +++ b/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala @@ -39,7 +39,7 @@ private[coordinator] final class ShardManager(settings: FilodbSettings, private val _tenantIngestionMeteringOpt = if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) { - val inst = TenantIngestionMetering( + val inst = new TenantIngestionMetering( settings, () => { _datasetInfo.map{ case (dsRef, _) => dsRef}.toIterator }, () => { _coordinators.head._2 }) diff --git a/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala index a78181fc19..25233cc544 100644 --- a/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala +++ b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala @@ -1,6 +1,7 @@ package filodb.coordinator -import java.util.concurrent.TimeUnit +import java.util.concurrent.{TimeoutException, TimeUnit} +import java.util.concurrent.locks.ReentrantReadWriteLock import scala.concurrent.duration.FiniteDuration import scala.util.{Failure, Success} @@ -16,6 +17,83 @@ import filodb.coordinator.client.QueryCommands.LogicalPlan2Query import filodb.core.DatasetRef import filodb.query.{QueryError, QueryResult, TsCardinalities} +/** + * Throttles the TenantIngestionMetering query rate according to the ratio of timeouts to non-timeouts. + * + * @param queryDelay the initial delay between each query. This is the duration to be adjusted. + */ +private class QueryThrottle(queryDelay: FiniteDuration) extends StrictLogging { + // currently just add this diff to the delay if the timeout rate exceeds THRESHOLD + private val DELAY_DIFF = FiniteDuration(5L, TimeUnit.MINUTES) + // number of past query timeouts/non-timeouts to consider + private val LOOKBACK = 10 + // {non-timeouts-in-lookback-window} / LOOKBACK < THRESHOLD will adjust the delay + private val THRESHOLD = 0.85 + + private var delay: FiniteDuration = queryDelay.copy() + private val lock = new ReentrantReadWriteLock() + + // these track timeouts for the past LOOKBACK queries + private var bits = (1 << LOOKBACK) - 1 + private var ibit = 0 + + /** + * Sets the next lookback bit and increments ibit. + */ + private def setNextBit(bit: Boolean): Unit = { + val bitVal = if (bit) 1 else 0 + bits = bits & ~(1 << ibit) // zero the bit + bits = bits | (bitVal << ibit) // 'or' in the new bit + ibit = ibit + 1 + if (ibit == LOOKBACK) { + ibit = 0 + } + } + + /** + * Updates the delay according to the timeout:non-timeoout ratio. + */ + private def updateDelay(): Unit = { + val successRate = Integer.bitCount(bits).toDouble / LOOKBACK + if (successRate < THRESHOLD) { + delay = delay + DELAY_DIFF + logger.info("too many timeouts; query delay extended to " + delay.toString()) + // reset the bits + bits = (1 << LOOKBACK) - 1 + } + } + + /** + * Record a query timeout. + */ + def recordTimeout(): Unit = { + lock.writeLock().lock() + setNextBit(false) + updateDelay() + lock.writeLock().unlock() + } + + /** + * Record a query non-timeout. + */ + def recordOnTime(): Unit = { + lock.writeLock().lock() + setNextBit(true) + updateDelay() + lock.writeLock().unlock() + } + + /** + * Returns the current query delay. + */ + def getDelay(): FiniteDuration = { + lock.readLock().lock() + val currDelay = delay.copy() + lock.readLock().unlock() + return currDelay + } +} + /** * Periodically queries a node for all namespace cardinalities. * Kamon gauges are updated with the response data. @@ -27,37 +105,39 @@ import filodb.query.{QueryError, QueryResult, TsCardinalities} * @param coordActorProducer produces a single actor to ask a query. Actors are * queried in the order they're returned from this function. */ -case class TenantIngestionMetering(settings: FilodbSettings, +class TenantIngestionMetering(settings: FilodbSettings, dsIterProducer: () => Iterator[DatasetRef], coordActorProducer: () => ActorRef) extends StrictLogging{ - private val ASK_TIMEOUT = FiniteDuration( + // time until first query executes + private val SCHED_INIT_DELAY = FiniteDuration( settings.config.getDuration("metering-query-interval").toSeconds, TimeUnit.SECONDS) - private val SCHED_INIT_DELAY = ASK_TIMEOUT // time until first job is scheduled - private val SCHED_DELAY = ASK_TIMEOUT // time between all jobs after the first private val CLUSTER_TYPE = settings.config.getString("cluster-type") private val METRIC_ACTIVE = "active_timeseries_by_tenant" private val METRIC_TOTAL = "total_timeseries_by_tenant" + private val queryLimiter = new QueryThrottle(SCHED_INIT_DELAY) + def schedulePeriodicPublishJob() : Unit = { // NOTE: the FiniteDuration overload of scheduleWithFixedDelay // does not work. Unsure why, but that's why these FiniteDurations are // awkwardly parsed into seconds. - scheduler.scheduleWithFixedDelay( + scheduler.scheduleOnce( SCHED_INIT_DELAY.toSeconds, - SCHED_DELAY.toSeconds, TimeUnit.SECONDS, - () => queryAndSchedulePublish()) + () => queryAndSchedule()) } /** * For each dataset, ask a Coordinator with a TsCardinalities LogicalPlan. - * Schedules a job to publish the Coordinator's response. + * Schedules: + * (1) a job to publish the Coordinator's response, and + * (2) a job to execute the next query */ - private def queryAndSchedulePublish() : Unit = { + private def queryAndSchedule() : Unit = { import filodb.query.exec.TsCardExec._ val groupDepth = 1 // group cardinalities at the second level (i.e. ws & ns) val prefix = Nil // query for cardinalities regardless of first-level name (i.e. ws name) @@ -65,25 +145,41 @@ case class TenantIngestionMetering(settings: FilodbSettings, val fut = Client.asyncAsk( coordActorProducer(), LogicalPlan2Query(dsRef, TsCardinalities(prefix, groupDepth)), - ASK_TIMEOUT) + queryLimiter.getDelay()) fut.onComplete { - case Success(QueryResult(_, _, rv, _, _, _)) => - rv.foreach(_.rows().foreach{ rr => - // publish a cardinality metric for each namespace - val data = RowData.fromRowReader(rr) - val prefix = data.group.toString.split(PREFIX_DELIM) - val tags = Map("metric_ws" -> prefix(0), - "metric_ns" -> prefix(1), - "dataset" -> dsRef.dataset, - "cluster_type" -> CLUSTER_TYPE) - Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble) - Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble) - }) - case Success(QueryError(_, _, t)) => logger.warn("QueryError: " + t.getMessage) - case Failure(t) => logger.warn("Failure: " + t.getMessage) + case Success(qresp) => + queryLimiter.recordOnTime() + qresp match { + case QueryResult(_, _, rv, _, _, _) => + rv.foreach(_.rows().foreach{ rr => + // publish a cardinality metric for each namespace + val data = RowData.fromRowReader(rr) + val prefix = data.group.toString.split(PREFIX_DELIM) + val tags = Map("metric_ws" -> prefix(0), + "metric_ns" -> prefix(1), + "dataset" -> dsRef.dataset, + "cluster_type" -> CLUSTER_TYPE) + Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble) + Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble) + }) + case QueryError(_, _, t) => logger.warn("QueryError: " + t.getMessage) + } + case Failure(t) => + logger.warn("Failure: " + t.getMessage) + if (t.isInstanceOf[TimeoutException]) { + queryLimiter.recordTimeout() + } else { + queryLimiter.recordOnTime() + } // required to compile case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut) } } + + // schedule the next query + scheduler.scheduleOnce( + queryLimiter.getDelay().toSeconds, + TimeUnit.SECONDS, + () => queryAndSchedule()) } }