diff --git a/coordinator/src/main/scala/filodb.coordinator/QueryThrottle.scala b/coordinator/src/main/scala/filodb.coordinator/QueryThrottle.scala new file mode 100644 index 0000000000..e01ce5aa0b --- /dev/null +++ b/coordinator/src/main/scala/filodb.coordinator/QueryThrottle.scala @@ -0,0 +1,74 @@ +package filodb.coordinator + +import com.typesafe.scalalogging.StrictLogging +import scala.concurrent.duration.FiniteDuration + +/** + * Throttles the TenantIngestionMetering query rate according to the ratio of timeouts to query attempts. + * + * @param queryInterval the initial delay between each query. + * @param intervalDiff diff added to queryInterval if the ratio of timeouts:queries in the lookback + * window exceeds timeoutThreshold + * @param timeoutThreshold ratio of timeouts:queries at which intervalDiff is added to queryInterval + * @param lookback number of queries used to check timeoutThreshold + */ +class QueryThrottle(queryInterval: FiniteDuration, + intervalDiff: FiniteDuration, + timeoutThreshold: Double, + lookback: Int) extends StrictLogging { + + private var interval: FiniteDuration = queryInterval.copy() + + // these track timeouts for the past LOOKBACK queries + private var bits = 0 // "1" indicates a timeout + private var ibit = 0 + + /** + * Sets the next lookback bit and increments ibit. + */ + private def setNextBit(bit: Boolean): Unit = { + val bitVal = if (bit) 1 else 0 + bits = bits & ~(1 << ibit) // zero the bit + bits = bits | (bitVal << ibit) // 'or' in the new bit + ibit = ibit + 1 + if (ibit == lookback) { + ibit = 0 + } + } + + /** + * Updates the interval according to the timeout:non-timeout ratio. + */ + private def updateInterval(): Unit = { + val failureRate = Integer.bitCount(bits).toDouble / lookback + if (failureRate > timeoutThreshold) { + interval = interval + intervalDiff + logger.info("too many timeouts; query interval extended to " + interval.toString()) + // reset the bits + bits = 0 + } + } + + /** + * Record a query timeout. + */ + def recordTimeout(): Unit = { + setNextBit(true) + updateInterval() + } + + /** + * Record a query non-timeout. + */ + def recordOnTime(): Unit = { + setNextBit(false) + updateInterval() + } + + /** + * Returns the current query interval. + */ + def getInterval(): FiniteDuration = { + interval.copy() + } +} diff --git a/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala b/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala index 119ceea7d8..bea562d0a0 100644 --- a/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala +++ b/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala @@ -39,12 +39,10 @@ private[coordinator] final class ShardManager(settings: FilodbSettings, private val _tenantIngestionMeteringOpt = if (settings.config.getBoolean("shard-key-level-ingestion-metrics-enabled")) { - val inst = TenantIngestionMetering( + Some(new TenantIngestionMetering( settings, () => { _datasetInfo.map{ case (dsRef, _) => dsRef}.toIterator }, - () => { _coordinators.head._2 }) - inst.schedulePeriodicPublishJob() - Some(inst) + () => { _coordinators.head._2 })) } else None val shardReassignmentMinInterval = settings.config.getDuration("shard-manager.reassignment-min-interval") diff --git a/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala index 37e547bb0a..344c597275 100644 --- a/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala +++ b/coordinator/src/main/scala/filodb.coordinator/TenantIngestionMetering.scala @@ -1,21 +1,34 @@ package filodb.coordinator -import java.util.concurrent.TimeUnit +import java.util.concurrent.{TimeoutException, TimeUnit} import scala.concurrent.duration.FiniteDuration -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} import akka.actor.ActorRef import com.typesafe.scalalogging.StrictLogging import kamon.Kamon import kamon.tag.TagSet +import monix.eval.Task import monix.execution.Scheduler.Implicits.{global => scheduler} +import monix.reactive.Observable import filodb.coordinator.client.Client import filodb.coordinator.client.QueryCommands.LogicalPlan2Query import filodb.core.DatasetRef import filodb.query.{QueryError, QueryResult, TsCardinalities} +object TenantIngestionMetering { + protected val METRIC_ACTIVE = "active_timeseries_by_tenant" + protected val METRIC_TOTAL = "total_timeseries_by_tenant" + protected val PARALLELISM = 8 // number of datasets queried in parallel + + // QueryThrottle args + protected val INTERVAL_DIFF = FiniteDuration(5L, TimeUnit.MINUTES) + protected val LOOKBACK = 10 + protected val TIMEOUT_THRESHOLD = 0.15 +} + /** * Periodically queries a node for all namespace cardinalities. * Kamon gauges are updated with the response data. @@ -27,63 +40,91 @@ import filodb.query.{QueryError, QueryResult, TsCardinalities} * @param coordActorProducer produces a single actor to ask a query. Actors are * queried in the order they're returned from this function. */ -case class TenantIngestionMetering(settings: FilodbSettings, - dsIterProducer: () => Iterator[DatasetRef], - coordActorProducer: () => ActorRef) extends StrictLogging{ - - private val ASK_TIMEOUT = FiniteDuration( - settings.config.getDuration("metering-query-interval").toSeconds, - TimeUnit.SECONDS) - private val SCHED_INIT_DELAY = ASK_TIMEOUT // time until first job is scheduled - private val SCHED_DELAY = ASK_TIMEOUT // time between all jobs after the first - - private val CLUSTER_TYPE = settings.config.getString("cluster-type") +class TenantIngestionMetering(settings: FilodbSettings, + dsIterProducer: () => Iterator[DatasetRef], + coordActorProducer: () => ActorRef) extends StrictLogging{ + import TenantIngestionMetering._ - private val METRIC_ACTIVE = "active_timeseries_by_tenant" - private val METRIC_TOTAL = "total_timeseries_by_tenant" + private val clusterType = settings.config.getString("cluster-type") + private var queryAskTimeSec = -1L // unix time of the most recent query ask + private val queryThrottle = new QueryThrottle( + FiniteDuration(settings.config.getDuration("metering-query-interval").toSeconds, + TimeUnit.SECONDS), + INTERVAL_DIFF, + TIMEOUT_THRESHOLD, + LOOKBACK) - def schedulePeriodicPublishJob() : Unit = { - // NOTE: the FiniteDuration overload of scheduleWithFixedDelay - // does not work. Unsure why, but that's why these FiniteDurations are - // awkwardly parsed into seconds. - scheduler.scheduleWithFixedDelay( - SCHED_INIT_DELAY.toSeconds, - SCHED_DELAY.toSeconds, - TimeUnit.SECONDS, - () => queryAndSchedulePublish()) - } + // immediately begin periodically querying for / publishing cardinality data + queryAndSchedule() + // scalastyle:off method.length /** - * For each dataset, ask a Coordinator with a TsCardinalities LogicalPlan. - * Schedules a job to publish the Coordinator's response. + * For each dataset, asks a Coordinator with a TsCardinalities LogicalPlan. + * A publish job is sob is scheduled for each response, and the next batch of + * queries is scheduled after all responses are processed/published. */ - private def queryAndSchedulePublish() : Unit = { + private def queryAndSchedule() : Unit = { import filodb.query.exec.TsCardExec._ - val numGroupByFields = 2 // group cardinalities at the second level (i.e. ws & ns) - val prefix = Nil // query for cardinalities regardless of first-level name (i.e. ws name) - dsIterProducer().foreach { dsRef => - val fut = Client.asyncAsk( - coordActorProducer(), - LogicalPlan2Query(dsRef, TsCardinalities(prefix, numGroupByFields)), - ASK_TIMEOUT) - fut.onComplete { - case Success(QueryResult(_, _, rv, _, _, _)) => - rv.foreach(_.rows().foreach{ rr => - // publish a cardinality metric for each namespace - val data = RowData.fromRowReader(rr) - val prefix = data.group.toString.split(PREFIX_DELIM) - val tags = Map("metric_ws" -> prefix(0), - "metric_ns" -> prefix(1), - "dataset" -> dsRef.dataset, - "cluster_type" -> CLUSTER_TYPE) - Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble) - Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble) - }) - case Success(QueryError(_, _, t)) => logger.warn("QueryError: " + t.getMessage) - case Failure(t) => logger.warn("Failure: " + t.getMessage) - // required to compile - case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + fut) + + // Nil prefix in order to query all client-owned workspaces; + // numGroupByFields = 2 to group by (ws, ns) + val tsCardQuery = TsCardinalities(Nil, 2) + + // use this later to find total elapsed time + queryAskTimeSec = java.time.Clock.systemUTC().instant().getEpochSecond + + Observable.fromIterator(dsIterProducer()).mapAsync(PARALLELISM){ dsRef => + // Asynchronously query each dataset; store (dsRef, queryResult) pairs + Task{ + val qres = Client.actorAsk( + coordActorProducer(), + LogicalPlan2Query(dsRef, tsCardQuery), + queryThrottle.getInterval()){ + case t: Try[Any] => t + } + (dsRef, qres) + } + }.foreach { case (dsRef, qres) => qres match { + // process the query results one-at-a-time (prevents the need for locks in QueryThrottle) + case Success(qresp) => + queryThrottle.recordOnTime() + qresp match { + case QueryResult(_, _, rv, _, _, _) => + rv.foreach(_.rows().foreach { rr => + // publish a cardinality metric for each namespace + val data = RowData.fromRowReader(rr) + val prefix = data.group.toString.split(PREFIX_DELIM) + val tags = Map( + "metric_ws" -> prefix(0), + "metric_ns" -> prefix(1), + "dataset" -> dsRef.dataset, + "cluster_type" -> clusterType) + Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble) + Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble) + }) + case QueryError(_, _, t) => logger.warn("QueryError: " + t.getMessage) + } + case Failure(t) => + logger.warn("Failure: " + t.getMessage) + if (t.isInstanceOf[TimeoutException]) { + queryThrottle.recordTimeout() + } else { + queryThrottle.recordOnTime() + } + // required to compile + case _ => throw new IllegalArgumentException("should never reach here; attempted to match: " + qres) } + }.onComplete { _ => + // Schedule the next query batch at the beginning of the next interval. + // Note: this "batch delay" setup is intended to keep the TIM config simple; only metering-query-interval + // needs to be configured. But it assumes the time required to sequentially process each + // response is negligible with respect to metering-query-interval. + val elapsedSec = java.time.Clock.systemUTC().instant().getEpochSecond - queryAskTimeSec + scheduler.scheduleOnce( + math.max(0, queryThrottle.getInterval().toSeconds - elapsedSec), + TimeUnit.SECONDS, + () => queryAndSchedule()) } } + // scalastyle:on method.length } diff --git a/coordinator/src/test/scala/filodb.coordinator/QueryThrottleSpec.scala b/coordinator/src/test/scala/filodb.coordinator/QueryThrottleSpec.scala new file mode 100644 index 0000000000..22fc16ca4c --- /dev/null +++ b/coordinator/src/test/scala/filodb.coordinator/QueryThrottleSpec.scala @@ -0,0 +1,73 @@ +package filodb.coordinator + +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.should.Matchers + +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration + +class QueryThrottleSpec extends AnyFunSpec with Matchers { + it("should correctly increment query delay") { + + val INIT_INTERVAL = FiniteDuration(10, TimeUnit.SECONDS) + val INTERVAL_DIFF = FiniteDuration(1, TimeUnit.SECONDS) + + // two timeouts are allowed before the interval is incremented + val throttle = new QueryThrottle( + INIT_INTERVAL, + INTERVAL_DIFF, + timeoutThreshold=0.67, + lookback=3) + + // on-times should not change the interval + throttle.getInterval() shouldEqual INIT_INTERVAL + throttle.recordOnTime() + throttle.getInterval() shouldEqual INIT_INTERVAL + throttle.recordOnTime() + throttle.getInterval() shouldEqual INIT_INTERVAL + throttle.recordOnTime() + throttle.getInterval() shouldEqual INIT_INTERVAL + + // use this to trach expected interval + var interval = INIT_INTERVAL + + // fail twice + throttle.recordTimeout() + throttle.getInterval() shouldEqual interval + throttle.recordTimeout() + throttle.getInterval() shouldEqual interval + + // next failure should increment interval + interval = interval + INTERVAL_DIFF + throttle.recordTimeout() + throttle.getInterval() shouldEqual interval + + // failure counter should reset-- fail twice again + throttle.recordTimeout() + throttle.getInterval() shouldEqual interval + throttle.recordTimeout() + throttle.getInterval() shouldEqual interval + + // next failure should increment the interval + interval = interval + INTERVAL_DIFF + throttle.recordTimeout() + throttle.getInterval() shouldEqual interval + + // success-failure-success-etc should not change the interval + throttle.recordTimeout() + throttle.recordOnTime() + throttle.recordTimeout() + throttle.recordOnTime() + throttle.recordTimeout() + throttle.recordOnTime() + throttle.getInterval() shouldEqual interval + + // successes should not reset the counter to its initialized value + throttle.recordOnTime() + throttle.recordOnTime() + throttle.recordOnTime() + throttle.recordOnTime() + throttle.recordOnTime() + throttle.getInterval() shouldEqual interval + } +}