From 20459bc4df3a6112a2cb4b0b634d5bc4d35706a8 Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Tue, 14 Dec 2021 13:38:00 -0800 Subject: [PATCH 1/4] add TIM query throttle --- .../filodb.coordinator/ShardManager.scala | 2 +- .../TenantIngestionMetering.scala | 150 +++++++++++++++--- 2 files changed, 126 insertions(+), 26 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala b/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala index 119ceea7d8..59ac730b9f 100644 --- a/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala +++ b/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala @@ -39,7 +39,7 @@ private[coordinator] final class ShardManager(settings: FilodbSettings, private val _tenantIngestionMeteringOpt = if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) { - val inst = TenantIngestionMetering( + val inst = new TenantIngestionMetering( settings, () => { _datasetInfo.map{ case (dsRef, _) => dsRef}.toIterator }, () => { _coordinators.head._2 }) diff --git a/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala index 37e547bb0a..e65fc02546 100644 --- a/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala +++ b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala @@ -1,6 +1,7 @@ package filodb.coordinator -import java.util.concurrent.TimeUnit +import java.util.concurrent.{TimeoutException, TimeUnit} +import java.util.concurrent.locks.ReentrantReadWriteLock import scala.concurrent.duration.FiniteDuration import scala.util.{Failure, Success} @@ -16,6 +17,87 @@ import filodb.coordinator.client.QueryCommands.LogicalPlan2Query import filodb.core.DatasetRef import filodb.query.{QueryError, QueryResult, TsCardinalities} +object QueryThrottle { + // currently just add this diff to the delay if the timeout rate exceeds THRESHOLD + val DELAY_DIFF = FiniteDuration(5L, TimeUnit.MINUTES) + // number of past query timeouts/non-timeouts to consider + val LOOKBACK = 10 + // {non-timeouts-in-lookback-window} / LOOKBACK < THRESHOLD will adjust the delay + val THRESHOLD = 0.85 +} + +/** + * Throttles the TenantIngestionMetering query rate according to the ratio of timeouts to non-timeouts. + * + * @param queryDelay the initial delay between each query. This is the duration to be adjusted. + */ +class QueryThrottle(queryDelay: FiniteDuration) extends StrictLogging { + import QueryThrottle._ + + private var delay: FiniteDuration = queryDelay.copy() + private val lock = new ReentrantReadWriteLock() + + // these track timeouts for the past LOOKBACK queries + private var bits = (1 << LOOKBACK) - 1 + private var ibit = 0 + + /** + * Sets the next lookback bit and increments ibit. + */ + private def setNextBit(bit: Boolean): Unit = { + val bitVal = if (bit) 1 else 0 + bits = bits & ~(1 << ibit) // zero the bit + bits = bits | (bitVal << ibit) // 'or' in the new bit + ibit = ibit + 1 + if (ibit == LOOKBACK) { + ibit = 0 + } + } + + /** + * Updates the delay according to the timeout:non-timeoout ratio. + */ + private def updateDelay(): Unit = { + val successRate = Integer.bitCount(bits).toDouble / LOOKBACK + if (successRate < THRESHOLD) { + delay = delay + DELAY_DIFF + logger.info("too many timeouts; query delay extended to " + delay.toString()) + // reset the bits + bits = (1 << LOOKBACK) - 1 + } + } + + /** + * Record a query timeout. + */ + def recordTimeout(): Unit = { + lock.writeLock().lock() + setNextBit(false) + updateDelay() + lock.writeLock().unlock() + } + + /** + * Record a query non-timeout. + */ + def recordOnTime(): Unit = { + lock.writeLock().lock() + setNextBit(true) + updateDelay() + lock.writeLock().unlock() + } + + /** + * Returns the current query delay. + */ + def getDelay(): FiniteDuration = { + lock.readLock().lock() + val currDelay = delay.copy() + lock.readLock().unlock() + return currDelay + } +} + /** * Periodically queries a node for all namespace cardinalities. * Kamon gauges are updated with the response data. @@ -27,37 +109,39 @@ import filodb.query.{QueryError, QueryResult, TsCardinalities} * @param coordActorProducer produces a single actor to ask a query. Actors are * queried in the order they're returned from this function. */ -case class TenantIngestionMetering(settings: FilodbSettings, +class TenantIngestionMetering(settings: FilodbSettings, dsIterProducer: () => Iterator[DatasetRef], coordActorProducer: () => ActorRef) extends StrictLogging{ - private val ASK_TIMEOUT = FiniteDuration( + // time until first query executes + private val SCHED_INIT_DELAY = FiniteDuration( settings.config.getDuration("metering-query-interval").toSeconds, TimeUnit.SECONDS) - private val SCHED_INIT_DELAY = ASK_TIMEOUT // time until first job is scheduled - private val SCHED_DELAY = ASK_TIMEOUT // time between all jobs after the first private val CLUSTER_TYPE = settings.config.getString("cluster-type") private val METRIC_ACTIVE = "active_timeseries_by_tenant" private val METRIC_TOTAL = "total_timeseries_by_tenant" + private val queryLimiter = new QueryThrottle(SCHED_INIT_DELAY) + def schedulePeriodicPublishJob() : Unit = { // NOTE: the FiniteDuration overload of scheduleWithFixedDelay // does not work. Unsure why, but that's why these FiniteDurations are // awkwardly parsed into seconds. - scheduler.scheduleWithFixedDelay( + scheduler.scheduleOnce( SCHED_INIT_DELAY.toSeconds, - SCHED_DELAY.toSeconds, TimeUnit.SECONDS, - () => queryAndSchedulePublish()) + () => queryAndSchedule()) } /** * For each dataset, ask a Coordinator with a TsCardinalities LogicalPlan. - * Schedules a job to publish the Coordinator's response. + * Schedules: + * (1) a job to publish the Coordinator's response, and + * (2) a job to execute the next query */ - private def queryAndSchedulePublish() : Unit = { + private def queryAndSchedule() : Unit = { import filodb.query.exec.TsCardExec._ val numGroupByFields = 2 // group cardinalities at the second level (i.e. ws & ns) val prefix = Nil // query for cardinalities regardless of first-level name (i.e. ws name) @@ -65,25 +149,41 @@ case class TenantIngestionMetering(settings: FilodbSettings, val fut = Client.asyncAsk( coordActorProducer(), LogicalPlan2Query(dsRef, TsCardinalities(prefix, numGroupByFields)), - ASK_TIMEOUT) + queryLimiter.getDelay()) fut.onComplete { - case Success(QueryResult(_, _, rv, _, _, _)) => - rv.foreach(_.rows().foreach{ rr => - // publish a cardinality metric for each namespace - val data = RowData.fromRowReader(rr) - val prefix = data.group.toString.split(PREFIX_DELIM) - val tags = Map("metric_ws" -> prefix(0), - "metric_ns" -> prefix(1), - "dataset" -> dsRef.dataset, - "cluster_type" -> CLUSTER_TYPE) - Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble) - Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble) - }) - case Success(QueryError(_, _, t)) => logger.warn("QueryError: " + t.getMessage) - case Failure(t) => logger.warn("Failure: " + t.getMessage) + case Success(qresp) => + queryLimiter.recordOnTime() + qresp match { + case QueryResult(_, _, rv, _, _, _) => + rv.foreach(_.rows().foreach{ rr => + // publish a cardinality metric for each namespace + val data = RowData.fromRowReader(rr) + val prefix = data.group.toString.split(PREFIX_DELIM) + val tags = Map("metric_ws" -> prefix(0), + "metric_ns" -> prefix(1), + "dataset" -> dsRef.dataset, + "cluster_type" -> CLUSTER_TYPE) + Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble) + Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble) + }) + case QueryError(_, _, t) => logger.warn("QueryError: " + t.getMessage) + } + case Failure(t) => + logger.warn("Failure: " + t.getMessage) + if (t.isInstanceOf[TimeoutException]) { + queryLimiter.recordTimeout() + } else { + queryLimiter.recordOnTime() + } // required to compile case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut) } } + + // schedule the next query + scheduler.scheduleOnce( + queryLimiter.getDelay().toSeconds, + TimeUnit.SECONDS, + () => queryAndSchedule()) } } From 6ad22b408225332a9a675f86e53c17e0ac22804b Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Wed, 22 Dec 2021 15:07:44 -0800 Subject: [PATCH 2/4] onComplete schedules next ask --- .../filodb.coordinator/ShardManager.scala | 6 +- .../TenantIngestionMetering.scala | 150 +++++++++--------- 2 files changed, 79 insertions(+), 77 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala b/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala index 59ac730b9f..bea562d0a0 100644 --- a/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala +++ b/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala @@ -39,12 +39,10 @@ private[coordinator] final class ShardManager(settings: FilodbSettings, private val _tenantIngestionMeteringOpt = if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) { - val inst = new TenantIngestionMetering( + Some(new TenantIngestionMetering( settings, () => { _datasetInfo.map{ case (dsRef, _) => dsRef}.toIterator }, - () => { _coordinators.head._2 }) - inst.schedulePeriodicPublishJob() - Some(inst) + () => { _coordinators.head._2 })) } else None val shardReassignmentMinInterval = settings.config.getDuration("shard-manager.reassignment-min-interval") diff --git a/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala index e65fc02546..9050d5d6ec 100644 --- a/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala +++ b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala @@ -18,23 +18,28 @@ import filodb.core.DatasetRef import filodb.query.{QueryError, QueryResult, TsCardinalities} object QueryThrottle { - // currently just add this diff to the delay if the timeout rate exceeds THRESHOLD - val DELAY_DIFF = FiniteDuration(5L, TimeUnit.MINUTES) + // currently just add this diff to the interval if the timeout rate exceeds THRESHOLD + protected val INTERVAL_DIFF = FiniteDuration(5L, TimeUnit.MINUTES) // number of past query timeouts/non-timeouts to consider - val LOOKBACK = 10 - // {non-timeouts-in-lookback-window} / LOOKBACK < THRESHOLD will adjust the delay - val THRESHOLD = 0.85 + protected val LOOKBACK = 10 + // {non-timeouts-in-lookback-window} / LOOKBACK < THRESHOLD will adjust the interval + protected val THRESHOLD = 0.85 +} + +object TenantIngestionMetering { + protected val METRIC_ACTIVE = "active_timeseries_by_tenant" + protected val METRIC_TOTAL = "total_timeseries_by_tenant" } /** * Throttles the TenantIngestionMetering query rate according to the ratio of timeouts to non-timeouts. * - * @param queryDelay the initial delay between each query. This is the duration to be adjusted. + * @param queryInterval the initial delay between each query. This is the duration to be adjusted. */ -class QueryThrottle(queryDelay: FiniteDuration) extends StrictLogging { +class QueryThrottle(queryInterval: FiniteDuration) extends StrictLogging { import QueryThrottle._ - private var delay: FiniteDuration = queryDelay.copy() + private var interval: FiniteDuration = queryInterval.copy() private val lock = new ReentrantReadWriteLock() // these track timeouts for the past LOOKBACK queries @@ -55,13 +60,13 @@ class QueryThrottle(queryDelay: FiniteDuration) extends StrictLogging { } /** - * Updates the delay according to the timeout:non-timeoout ratio. + * Updates the interval according to the timeout:non-timeout ratio. */ - private def updateDelay(): Unit = { + private def updateInterval(): Unit = { val successRate = Integer.bitCount(bits).toDouble / LOOKBACK if (successRate < THRESHOLD) { - delay = delay + DELAY_DIFF - logger.info("too many timeouts; query delay extended to " + delay.toString()) + interval = interval + INTERVAL_DIFF + logger.info("too many timeouts; query interval extended to " + interval.toString()) // reset the bits bits = (1 << LOOKBACK) - 1 } @@ -73,7 +78,7 @@ class QueryThrottle(queryDelay: FiniteDuration) extends StrictLogging { def recordTimeout(): Unit = { lock.writeLock().lock() setNextBit(false) - updateDelay() + updateInterval() lock.writeLock().unlock() } @@ -83,18 +88,18 @@ class QueryThrottle(queryDelay: FiniteDuration) extends StrictLogging { def recordOnTime(): Unit = { lock.writeLock().lock() setNextBit(true) - updateDelay() + updateInterval() lock.writeLock().unlock() } /** - * Returns the current query delay. + * Returns the current query interval. */ - def getDelay(): FiniteDuration = { + def getInterval(): FiniteDuration = { lock.readLock().lock() - val currDelay = delay.copy() + val currInterval = interval.copy() lock.readLock().unlock() - return currDelay + currInterval } } @@ -110,31 +115,20 @@ class QueryThrottle(queryDelay: FiniteDuration) extends StrictLogging { * queried in the order they're returned from this function. */ class TenantIngestionMetering(settings: FilodbSettings, - dsIterProducer: () => Iterator[DatasetRef], - coordActorProducer: () => ActorRef) extends StrictLogging{ + dsIterProducer: () => Iterator[DatasetRef], + coordActorProducer: () => ActorRef) extends StrictLogging{ + import TenantIngestionMetering._ - // time until first query executes - private val SCHED_INIT_DELAY = FiniteDuration( + private val clusterType = settings.config.getString("cluster-type") + private var queryAskTimeSec = -1L // unix time of the most recent query ask + private val queryThrottle = new QueryThrottle(FiniteDuration( settings.config.getDuration("metering-query-interval").toSeconds, - TimeUnit.SECONDS) - - private val CLUSTER_TYPE = settings.config.getString("cluster-type") + TimeUnit.SECONDS)) - private val METRIC_ACTIVE = "active_timeseries_by_tenant" - private val METRIC_TOTAL = "total_timeseries_by_tenant" - - private val queryLimiter = new QueryThrottle(SCHED_INIT_DELAY) - - def schedulePeriodicPublishJob() : Unit = { - // NOTE: the FiniteDuration overload of scheduleWithFixedDelay - // does not work. Unsure why, but that's why these FiniteDurations are - // awkwardly parsed into seconds. - scheduler.scheduleOnce( - SCHED_INIT_DELAY.toSeconds, - TimeUnit.SECONDS, - () => queryAndSchedule()) - } + // immediately begin periodically querying for / publishing cardinality data + queryAndSchedule() + // scalastyle:off method.length /** * For each dataset, ask a Coordinator with a TsCardinalities LogicalPlan. * Schedules: @@ -145,45 +139,55 @@ class TenantIngestionMetering(settings: FilodbSettings, import filodb.query.exec.TsCardExec._ val numGroupByFields = 2 // group cardinalities at the second level (i.e. ws & ns) val prefix = Nil // query for cardinalities regardless of first-level name (i.e. ws name) + + // use this later to find total elapsed time + queryAskTimeSec = java.time.Clock.systemUTC().instant().getEpochSecond + dsIterProducer().foreach { dsRef => val fut = Client.asyncAsk( coordActorProducer(), LogicalPlan2Query(dsRef, TsCardinalities(prefix, numGroupByFields)), - queryLimiter.getDelay()) - fut.onComplete { - case Success(qresp) => - queryLimiter.recordOnTime() - qresp match { - case QueryResult(_, _, rv, _, _, _) => - rv.foreach(_.rows().foreach{ rr => - // publish a cardinality metric for each namespace - val data = RowData.fromRowReader(rr) - val prefix = data.group.toString.split(PREFIX_DELIM) - val tags = Map("metric_ws" -> prefix(0), - "metric_ns" -> prefix(1), - "dataset" -> dsRef.dataset, - "cluster_type" -> CLUSTER_TYPE) - Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble) - Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble) - }) - case QueryError(_, _, t) => logger.warn("QueryError: " + t.getMessage) - } - case Failure(t) => - logger.warn("Failure: " + t.getMessage) - if (t.isInstanceOf[TimeoutException]) { - queryLimiter.recordTimeout() - } else { - queryLimiter.recordOnTime() - } - // required to compile - case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut) + queryThrottle.getInterval()) + + fut.onComplete { tryRes => + tryRes match { + case Success(qresp) => + queryThrottle.recordOnTime() + qresp match { + case QueryResult(_, _, rv, _, _, _) => + rv.foreach(_.rows().foreach{ rr => + // publish a cardinality metric for each namespace + val data = RowData.fromRowReader(rr) + val prefix = data.group.toString.split(PREFIX_DELIM) + val tags = Map( + "metric_ws" -> prefix(0), + "metric_ns" -> prefix(1), + "dataset" -> dsRef.dataset, + "cluster_type" -> clusterType) + Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble) + Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble) + }) + case QueryError(_, _, t) => logger.warn("QueryError: " + t.getMessage) + } + case Failure(t) => + logger.warn("Failure: " + t.getMessage) + if (t.isInstanceOf[TimeoutException]) { + queryThrottle.recordTimeout() + } else { + queryThrottle.recordOnTime() + } + // required to compile + case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut) + } + + // Delay the next query until the beginning of the next interval. + val elapsedSec = java.time.Clock.systemUTC().instant().getEpochSecond - queryAskTimeSec + scheduler.scheduleOnce( + math.max(0, queryThrottle.getInterval().toSeconds - elapsedSec), + TimeUnit.SECONDS, + () => queryAndSchedule()) } } - - // schedule the next query - scheduler.scheduleOnce( - queryLimiter.getDelay().toSeconds, - TimeUnit.SECONDS, - () => queryAndSchedule()) } + // scalastyle:on method.length } From 6cb0c28dd796981bc657090cdc87a43da788d380 Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Tue, 4 Jan 2022 13:53:05 -0800 Subject: [PATCH 3/4] remove throttle locks --- .../TenantIngestionMetering.scala | 120 +++++++++--------- 1 file changed, 61 insertions(+), 59 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala index 9050d5d6ec..2f37b6f99f 100644 --- a/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala +++ b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala @@ -1,16 +1,17 @@ package filodb.coordinator import java.util.concurrent.{TimeoutException, TimeUnit} -import java.util.concurrent.locks.ReentrantReadWriteLock import scala.concurrent.duration.FiniteDuration -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} import akka.actor.ActorRef import com.typesafe.scalalogging.StrictLogging import kamon.Kamon import kamon.tag.TagSet +import monix.eval.Task import monix.execution.Scheduler.Implicits.{global => scheduler} +import monix.reactive.Observable import filodb.coordinator.client.Client import filodb.coordinator.client.QueryCommands.LogicalPlan2Query @@ -29,6 +30,7 @@ object QueryThrottle { object TenantIngestionMetering { protected val METRIC_ACTIVE = "active_timeseries_by_tenant" protected val METRIC_TOTAL = "total_timeseries_by_tenant" + protected val PARALLELISM = 8 // number of datasets queried in parallel } /** @@ -40,7 +42,6 @@ class QueryThrottle(queryInterval: FiniteDuration) extends StrictLogging { import QueryThrottle._ private var interval: FiniteDuration = queryInterval.copy() - private val lock = new ReentrantReadWriteLock() // these track timeouts for the past LOOKBACK queries private var bits = (1 << LOOKBACK) - 1 @@ -76,30 +77,23 @@ class QueryThrottle(queryInterval: FiniteDuration) extends StrictLogging { * Record a query timeout. */ def recordTimeout(): Unit = { - lock.writeLock().lock() setNextBit(false) updateInterval() - lock.writeLock().unlock() } /** * Record a query non-timeout. */ def recordOnTime(): Unit = { - lock.writeLock().lock() setNextBit(true) updateInterval() - lock.writeLock().unlock() } /** * Returns the current query interval. */ def getInterval(): FiniteDuration = { - lock.readLock().lock() - val currInterval = interval.copy() - lock.readLock().unlock() - currInterval + interval.copy() } } @@ -130,63 +124,71 @@ class TenantIngestionMetering(settings: FilodbSettings, // scalastyle:off method.length /** - * For each dataset, ask a Coordinator with a TsCardinalities LogicalPlan. - * Schedules: - * (1) a job to publish the Coordinator's response, and - * (2) a job to execute the next query + * For each dataset, asks a Coordinator with a TsCardinalities LogicalPlan. + * A publish job is sob is scheduled for each response, and the next batch of + * queries is scheduled after all responses are processed/published. */ private def queryAndSchedule() : Unit = { import filodb.query.exec.TsCardExec._ - val numGroupByFields = 2 // group cardinalities at the second level (i.e. ws & ns) - val prefix = Nil // query for cardinalities regardless of first-level name (i.e. ws name) + + // Nil prefix in order to query all client-owned workspaces; + // numGroupByFields = 2 to group by (ws, ns) + val tsCardQuery = TsCardinalities(Nil, 2) // use this later to find total elapsed time queryAskTimeSec = java.time.Clock.systemUTC().instant().getEpochSecond - dsIterProducer().foreach { dsRef => - val fut = Client.asyncAsk( - coordActorProducer(), - LogicalPlan2Query(dsRef, TsCardinalities(prefix, numGroupByFields)), - queryThrottle.getInterval()) - - fut.onComplete { tryRes => - tryRes match { - case Success(qresp) => - queryThrottle.recordOnTime() - qresp match { - case QueryResult(_, _, rv, _, _, _) => - rv.foreach(_.rows().foreach{ rr => - // publish a cardinality metric for each namespace - val data = RowData.fromRowReader(rr) - val prefix = data.group.toString.split(PREFIX_DELIM) - val tags = Map( - "metric_ws" -> prefix(0), - "metric_ns" -> prefix(1), - "dataset" -> dsRef.dataset, - "cluster_type" -> clusterType) - Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble) - Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble) - }) - case QueryError(_, _, t) => logger.warn("QueryError: " + t.getMessage) - } - case Failure(t) => - logger.warn("Failure: " + t.getMessage) - if (t.isInstanceOf[TimeoutException]) { - queryThrottle.recordTimeout() - } else { - queryThrottle.recordOnTime() - } - // required to compile - case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut) + Observable.fromIterator(dsIterProducer()).mapAsync(PARALLELISM){ dsRef => + // Asynchronously query each dataset; store (dsRef, queryResult) pairs + Task{ + val qres = Client.actorAsk( + coordActorProducer(), + LogicalPlan2Query(dsRef, tsCardQuery), + queryThrottle.getInterval()){ + case t: Try[Any] => t + } + (dsRef, qres) + } + }.foreach { case (dsRef, qres) => qres match { + // process the query results one-at-a-time (prevents the need for locks in QueryThrottle) + case Success(qresp) => + queryThrottle.recordOnTime() + qresp match { + case QueryResult(_, _, rv, _, _, _) => + rv.foreach(_.rows().foreach { rr => + // publish a cardinality metric for each namespace + val data = RowData.fromRowReader(rr) + val prefix = data.group.toString.split(PREFIX_DELIM) + val tags = Map( + "metric_ws" -> prefix(0), + "metric_ns" -> prefix(1), + "dataset" -> dsRef.dataset, + "cluster_type" -> clusterType) + Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble) + Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble) + }) + case QueryError(_, _, t) => logger.warn("QueryError: " + t.getMessage) } - - // Delay the next query until the beginning of the next interval. - val elapsedSec = java.time.Clock.systemUTC().instant().getEpochSecond - queryAskTimeSec - scheduler.scheduleOnce( - math.max(0, queryThrottle.getInterval().toSeconds - elapsedSec), - TimeUnit.SECONDS, - () => queryAndSchedule()) + case Failure(t) => + logger.warn("Failure: " + t.getMessage) + if (t.isInstanceOf[TimeoutException]) { + queryThrottle.recordTimeout() + } else { + queryThrottle.recordOnTime() + } + // required to compile + case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + qres) } + }.onComplete { _ => + // Schedule the next query batch at the beginning of the next interval. + // Note: this "batch delay" setup is intended to keep the TIM config simple; only metering-query-interval + // needs to be configured. But it assumes the time required to sequentially process each + // response is negligible with respect to metering-query-interval. + val elapsedSec = java.time.Clock.systemUTC().instant().getEpochSecond - queryAskTimeSec + scheduler.scheduleOnce( + math.max(0, queryThrottle.getInterval().toSeconds - elapsedSec), + TimeUnit.SECONDS, + () => queryAndSchedule()) } } // scalastyle:on method.length From df746426534b52dfca85994d2722da4430225475 Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Wed, 5 Jan 2022 12:34:30 -0800 Subject: [PATCH 4/4] QueryThrottle to new file / tests --- .../filodb.coordinator/QueryThrottle.scala | 74 ++++++++++++++++ .../TenantIngestionMetering.scala | 85 +++---------------- .../QueryThrottleSpec.scala | 73 ++++++++++++++++ 3 files changed, 157 insertions(+), 75 deletions(-) create mode 100644 coordinator/src/main/scala/filodb.coordinator/QueryThrottle.scala create mode 100644 coordinator/src/test/scala/filodb.coordinator/QueryThrottleSpec.scala diff --git a/coordinator/src/main/scala/filodb.coordinator/QueryThrottle.scala b/coordinator/src/main/scala/filodb.coordinator/QueryThrottle.scala new file mode 100644 index 0000000000..e01ce5aa0b --- /dev/null +++ b/coordinator/src/main/scala/filodb.coordinator/QueryThrottle.scala @@ -0,0 +1,74 @@ +package filodb.coordinator + +import com.typesafe.scalalogging.StrictLogging +import scala.concurrent.duration.FiniteDuration + +/** + * Throttles the TenantIngestionMetering query rate according to the ratio of timeouts to query attempts. + * + * @param queryInterval the initial delay between each query. + * @param intervalDiff diff added to queryInterval if the ratio of timeouts:queries in the lookback + * window exceeds timeoutThreshold + * @param timeoutThreshold ratio of timeouts:queries at which intervalDiff is added to queryInterval + * @param lookback number of queries used to check timeoutThreshold + */ +class QueryThrottle(queryInterval: FiniteDuration, + intervalDiff: FiniteDuration, + timeoutThreshold: Double, + lookback: Int) extends StrictLogging { + + private var interval: FiniteDuration = queryInterval.copy() + + // these track timeouts for the past LOOKBACK queries + private var bits = 0 // "1" indicates a timeout + private var ibit = 0 + + /** + * Sets the next lookback bit and increments ibit. + */ + private def setNextBit(bit: Boolean): Unit = { + val bitVal = if (bit) 1 else 0 + bits = bits & ~(1 << ibit) // zero the bit + bits = bits | (bitVal << ibit) // 'or' in the new bit + ibit = ibit + 1 + if (ibit == lookback) { + ibit = 0 + } + } + + /** + * Updates the interval according to the timeout:non-timeout ratio. + */ + private def updateInterval(): Unit = { + val failureRate = Integer.bitCount(bits).toDouble / lookback + if (failureRate > timeoutThreshold) { + interval = interval + intervalDiff + logger.info("too many timeouts; query interval extended to " + interval.toString()) + // reset the bits + bits = 0 + } + } + + /** + * Record a query timeout. + */ + def recordTimeout(): Unit = { + setNextBit(true) + updateInterval() + } + + /** + * Record a query non-timeout. + */ + def recordOnTime(): Unit = { + setNextBit(false) + updateInterval() + } + + /** + * Returns the current query interval. + */ + def getInterval(): FiniteDuration = { + interval.copy() + } +} diff --git a/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala index 2f37b6f99f..344c597275 100644 --- a/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala +++ b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala @@ -18,83 +18,15 @@ import filodb.coordinator.client.QueryCommands.LogicalPlan2Query import filodb.core.DatasetRef import filodb.query.{QueryError, QueryResult, TsCardinalities} -object QueryThrottle { - // currently just add this diff to the interval if the timeout rate exceeds THRESHOLD - protected val INTERVAL_DIFF = FiniteDuration(5L, TimeUnit.MINUTES) - // number of past query timeouts/non-timeouts to consider - protected val LOOKBACK = 10 - // {non-timeouts-in-lookback-window} / LOOKBACK < THRESHOLD will adjust the interval - protected val THRESHOLD = 0.85 -} - object TenantIngestionMetering { protected val METRIC_ACTIVE = "active_timeseries_by_tenant" protected val METRIC_TOTAL = "total_timeseries_by_tenant" protected val PARALLELISM = 8 // number of datasets queried in parallel -} - -/** - * Throttles the TenantIngestionMetering query rate according to the ratio of timeouts to non-timeouts. - * - * @param queryInterval the initial delay between each query. This is the duration to be adjusted. - */ -class QueryThrottle(queryInterval: FiniteDuration) extends StrictLogging { - import QueryThrottle._ - - private var interval: FiniteDuration = queryInterval.copy() - - // these track timeouts for the past LOOKBACK queries - private var bits = (1 << LOOKBACK) - 1 - private var ibit = 0 - - /** - * Sets the next lookback bit and increments ibit. - */ - private def setNextBit(bit: Boolean): Unit = { - val bitVal = if (bit) 1 else 0 - bits = bits & ~(1 << ibit) // zero the bit - bits = bits | (bitVal << ibit) // 'or' in the new bit - ibit = ibit + 1 - if (ibit == LOOKBACK) { - ibit = 0 - } - } - - /** - * Updates the interval according to the timeout:non-timeout ratio. - */ - private def updateInterval(): Unit = { - val successRate = Integer.bitCount(bits).toDouble / LOOKBACK - if (successRate < THRESHOLD) { - interval = interval + INTERVAL_DIFF - logger.info("too many timeouts; query interval extended to " + interval.toString()) - // reset the bits - bits = (1 << LOOKBACK) - 1 - } - } - /** - * Record a query timeout. - */ - def recordTimeout(): Unit = { - setNextBit(false) - updateInterval() - } - - /** - * Record a query non-timeout. - */ - def recordOnTime(): Unit = { - setNextBit(true) - updateInterval() - } - - /** - * Returns the current query interval. - */ - def getInterval(): FiniteDuration = { - interval.copy() - } + // QueryThrottle args + protected val INTERVAL_DIFF = FiniteDuration(5L, TimeUnit.MINUTES) + protected val LOOKBACK = 10 + protected val TIMEOUT_THRESHOLD = 0.15 } /** @@ -115,9 +47,12 @@ class TenantIngestionMetering(settings: FilodbSettings, private val clusterType = settings.config.getString("cluster-type") private var queryAskTimeSec = -1L // unix time of the most recent query ask - private val queryThrottle = new QueryThrottle(FiniteDuration( - settings.config.getDuration("metering-query-interval").toSeconds, - TimeUnit.SECONDS)) + private val queryThrottle = new QueryThrottle( + FiniteDuration(settings.config.getDuration("metering-query-interval").toSeconds, + TimeUnit.SECONDS), + INTERVAL_DIFF, + TIMEOUT_THRESHOLD, + LOOKBACK) // immediately begin periodically querying for / publishing cardinality data queryAndSchedule() diff --git a/coordinator/src/test/scala/filodb.coordinator/QueryThrottleSpec.scala b/coordinator/src/test/scala/filodb.coordinator/QueryThrottleSpec.scala new file mode 100644 index 0000000000..22fc16ca4c --- /dev/null +++ b/coordinator/src/test/scala/filodb.coordinator/QueryThrottleSpec.scala @@ -0,0 +1,73 @@ +package filodb.coordinator + +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.should.Matchers + +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration + +class QueryThrottleSpec extends AnyFunSpec with Matchers { + it("should correctly increment query delay") { + + val INIT_INTERVAL = FiniteDuration(10, TimeUnit.SECONDS) + val INTERVAL_DIFF = FiniteDuration(1, TimeUnit.SECONDS) + + // two timeouts are allowed before the interval is incremented + val throttle = new QueryThrottle( + INIT_INTERVAL, + INTERVAL_DIFF, + timeoutThreshold=0.67, + lookback=3) + + // on-times should not change the interval + throttle.getInterval() shouldEqual INIT_INTERVAL + throttle.recordOnTime() + throttle.getInterval() shouldEqual INIT_INTERVAL + throttle.recordOnTime() + throttle.getInterval() shouldEqual INIT_INTERVAL + throttle.recordOnTime() + throttle.getInterval() shouldEqual INIT_INTERVAL + + // use this to trach expected interval + var interval = INIT_INTERVAL + + // fail twice + throttle.recordTimeout() + throttle.getInterval() shouldEqual interval + throttle.recordTimeout() + throttle.getInterval() shouldEqual interval + + // next failure should increment interval + interval = interval + INTERVAL_DIFF + throttle.recordTimeout() + throttle.getInterval() shouldEqual interval + + // failure counter should reset-- fail twice again + throttle.recordTimeout() + throttle.getInterval() shouldEqual interval + throttle.recordTimeout() + throttle.getInterval() shouldEqual interval + + // next failure should increment the interval + interval = interval + INTERVAL_DIFF + throttle.recordTimeout() + throttle.getInterval() shouldEqual interval + + // success-failure-success-etc should not change the interval + throttle.recordTimeout() + throttle.recordOnTime() + throttle.recordTimeout() + throttle.recordOnTime() + throttle.recordTimeout() + throttle.recordOnTime() + throttle.getInterval() shouldEqual interval + + // successes should not reset the counter to its initialized value + throttle.recordOnTime() + throttle.recordOnTime() + throttle.recordOnTime() + throttle.recordOnTime() + throttle.recordOnTime() + throttle.getInterval() shouldEqual interval + } +}