diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala index 86c3a5ada..d457a2710 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala @@ -1028,8 +1028,7 @@ class SingleClusterPlanner(val dataset: Dataset, private def materializeSeriesKeysByFilters(qContext: QueryContext, lp: SeriesKeysByFilters, forceInProcess: Boolean): PlanResult = { - // NOTE: _type_ filter support currently isn't there in series keys queries - val (renamedFilters, _) = extractSchemaFilter(renameMetricFilter(lp.filters)) + val renamedFilters = renameMetricFilter(lp.filters) val shardsToHit = if (canGetShardsFromFilters(renamedFilters, qContext)) { shardsFromFilters(renamedFilters, qContext, lp.startMs, lp.endMs) diff --git a/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala b/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala index 0f7e72a24..4131753b6 100644 --- a/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala @@ -329,7 +329,7 @@ class NodeCoordinatorActorSpec extends ActorTest(NodeCoordinatorActorSpec.getNew memStore.refreshIndexForTesting(dataset1.ref) probe.send(coordinatorActor, GetIndexNames(ref)) - probe.expectMsg(Seq("series")) + probe.expectMsg(Seq("series", "_type_")) probe.send(coordinatorActor, GetIndexValues(ref, "series", 0, limit=4)) probe.expectMsg(Seq(("Series 0", 1), ("Series 1", 1), ("Series 2", 1), ("Series 3", 1))) @@ -343,7 +343,7 @@ class NodeCoordinatorActorSpec extends ActorTest(NodeCoordinatorActorSpec.getNew memStore.refreshIndexForTesting(dataset1.ref) probe.send(coordinatorActor, GetIndexNames(ref)) - probe.expectMsg(Seq("series")) + probe.expectMsg(Seq("series", "_type_")) //actor should restart and serve queries again probe.send(coordinatorActor, GetIndexValues(ref, "series", 0, limit=4)) diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf index eac1ffcf0..493e77712 100644 --- a/core/src/main/resources/filodb-defaults.conf +++ b/core/src/main/resources/filodb-defaults.conf @@ -803,6 +803,9 @@ filodb { # Whether caching on index is disabled underlying Lucene index uses LRUCache enabled by default, the flag lets us #disable this feature disable-index-caching = false + + # Whether to add the _type_ label to all time series for the purpose of filtering + type-field-indexing-enabled = false } # for standalone worker cluster configuration, see akka-bootstrapper diff --git a/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala b/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala index 01a1252f7..0ab22c3e0 100644 --- a/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala +++ b/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala @@ -279,7 +279,7 @@ final class RecordSchema(val columns: Seq[ColumnInfo], result ++= consumer.stringPairs case (BinaryRecordColumn, i) => result ++= brSchema(i).toStringPairs(blobBase(base, offset, i), blobOffset(base, offset, i)) - result += ("_type_" -> + result += (Schemas.TypeLabel -> Schemas.global.schemaName( RecordSchema.schemaID(blobBase(base, offset, i), blobOffset(base, offset, i)))) diff --git a/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala b/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala index 599c2f51f..54d069c38 100644 --- a/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala @@ -83,6 +83,7 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef, private val deploymentPartitionName = filodbConfig.getString("deployment-partition-name") private val downsampleStoreConfig = StoreConfig(filodbConfig.getConfig("downsampler.downsample-store-config")) + private val typeFieldIndexingEnabled = filodbConfig.getBoolean("memstore.type-field-indexing-enabled") private val stats = new DownsampledTimeSeriesShardStats(rawDatasetRef, shardNum) @@ -102,7 +103,7 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef, private val partKeyIndex: PartKeyIndexDownsampled = new PartKeyLuceneIndex(indexDataset, schemas.part, false, false, shardNum, indexTtlMs, downsampleConfig.indexLocation.map(new java.io.File(_)), - indexMetadataStore + indexMetadataStore, addMetricTypeField = typeFieldIndexingEnabled ) private val indexUpdatedHour = new AtomicLong(0) @@ -174,7 +175,8 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef, schemas.part.binSchema.toStringPairs(partKey.base, partKey.offset).map(pair => { pair._1.utf8 -> pair._2.utf8 }).toMap ++ - Map("_type_".utf8 -> Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8) + Map(Schemas.TypeLabel.utf8 -> + Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8) } } diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala index 2cade7d97..1cacce1d3 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala @@ -36,10 +36,10 @@ import spire.syntax.cfor._ import filodb.core.{concurrentCache, DatasetRef} import filodb.core.Types.PartitionKey -import filodb.core.binaryrecord2.MapItemConsumer +import filodb.core.binaryrecord2.{MapItemConsumer, RecordSchema} import filodb.core.memstore.ratelimit.CardinalityTracker +import filodb.core.metadata.{PartitionSchema, Schemas} import filodb.core.metadata.Column.ColumnType.{MapColumn, StringColumn} -import filodb.core.metadata.PartitionSchema import filodb.core.query.{ColumnFilter, Filter, QueryUtils} import filodb.core.query.Filter._ import filodb.memory.{BinaryRegionLarge, UTF8StringMedium, UTF8StringShort} @@ -133,7 +133,8 @@ class PartKeyLuceneIndex(ref: DatasetRef, diskLocation: Option[File] = None, val lifecycleManager: Option[IndexMetadataStore] = None, useMemoryMappedImpl: Boolean = true, - disableIndexCaching: Boolean = false + disableIndexCaching: Boolean = false, + addMetricTypeField: Boolean = true ) extends StrictLogging with PartKeyIndexDownsampled { import PartKeyLuceneIndex._ @@ -392,8 +393,8 @@ class PartKeyLuceneIndex(ref: DatasetRef, val value = new String(valueBase.asInstanceOf[Array[Byte]], unsafeOffsetToBytesRefOffset(valueOffset + 2), // add 2 to move past numBytes UTF8StringMedium.numBytes(valueBase, valueOffset), StandardCharsets.UTF_8) - addIndexedField(key, value) - } + addIndexedField(key, value, clientData = true) + } } /** @@ -409,7 +410,7 @@ class PartKeyLuceneIndex(ref: DatasetRef, val numBytes = schema.binSchema.blobNumBytes(base, offset, pos) val value = new String(base.asInstanceOf[Array[Byte]], strOffset.toInt - UnsafeUtils.arayOffset, numBytes, StandardCharsets.UTF_8) - addIndexedField(colName.toString, value) + addIndexedField(colName.toString, value, clientData = true) } def getNamesValues(key: PartitionKey): Seq[(UTF8Str, UTF8Str)] = ??? // not used } @@ -425,7 +426,6 @@ class PartKeyLuceneIndex(ref: DatasetRef, } }.toArray - def getCurrentIndexState(): (IndexState.Value, Option[Long]) = lifecycleManager.map(_.currentState(this.ref, this.shardNum)).getOrElse((IndexState.Empty, None)) @@ -619,8 +619,23 @@ class PartKeyLuceneIndex(ref: DatasetRef, ret } - private def addIndexedField(labelName: String, value: String): Unit = { - luceneDocument.get().addField(labelName, value) + /** + * + * @param clientData pass true if the field data has come from metric source, and false if internally setting the + * field data. This is used to determine if the type field data should be indexed or not. + */ + private def addIndexedField(labelName: String, value: String, clientData: Boolean): Unit = { + if (clientData && addMetricTypeField) { + // do not index any existing _type_ tag since this is reserved and should not be passed in by clients + if (labelName != Schemas.TypeLabel) + luceneDocument.get().addField(labelName, value) + else + logger.warn("Map column with name '_type_' is a reserved label. Not indexing it.") + // I would have liked to log the entire PK to debug, but it is not accessible from here. + // Ignoring for now, since the plan of record is to drop reserved labels at ingestion gateway. + } else { + luceneDocument.get().addField(labelName, value) + } } def addPartKey(partKeyOnHeapBytes: Array[Byte], @@ -677,6 +692,10 @@ class PartKeyLuceneIndex(ref: DatasetRef, // If configured and enabled, Multi-column facets will be created on "partition-schema" columns createMultiColumnFacets(partKeyOnHeapBytes, partKeyBytesRefOffset) + val schemaName = Schemas.global.schemaName(RecordSchema.schemaID(partKeyOnHeapBytes, UnsafeUtils.arayOffset)) + if (addMetricTypeField) + addIndexedField(Schemas.TypeLabel, schemaName, clientData = false) + cforRange { 0 until numPartColumns } { i => indexers(i).fromPartKey(partKeyOnHeapBytes, bytesRefToUnsafeOffset(partKeyBytesRefOffset), partId) } diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala index ab1a73a27..49b465cfa 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala @@ -285,6 +285,7 @@ class TimeSeriesShard(val ref: DatasetRef, private val indexFacetingEnabledAllLabels = filodbConfig.getBoolean("memstore.index-faceting-enabled-for-all-labels") private val numParallelFlushes = filodbConfig.getInt("memstore.flush-task-parallelism") private val disableIndexCaching = filodbConfig.getBoolean("memstore.disable-index-caching") + private val typeFieldIndexingEnabled = filodbConfig.getBoolean("memstore.type-field-indexing-enabled") /////// END CONFIGURATION FIELDS /////////////////// @@ -313,7 +314,8 @@ class TimeSeriesShard(val ref: DatasetRef, */ private[memstore] final val partKeyIndex: PartKeyIndexRaw = new PartKeyLuceneIndex(ref, schemas.part, indexFacetingEnabledAllLabels, indexFacetingEnabledShardKeyLabels, shardNum, - storeConfig.diskTTLSeconds * 1000, disableIndexCaching = disableIndexCaching) + storeConfig.diskTTLSeconds * 1000, disableIndexCaching = disableIndexCaching, + addMetricTypeField = typeFieldIndexingEnabled) private val cardTracker: CardinalityTracker = initCardTracker() @@ -1853,7 +1855,7 @@ class TimeSeriesShard(val ref: DatasetRef, schemas.part.binSchema.toStringPairs(partKey.base, partKey.offset).map(pair => { pair._1.utf8 -> pair._2.utf8 }).toMap ++ - Map("_type_".utf8 -> Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8) + Map(Schemas.TypeLabel.utf8 -> Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8) } /** diff --git a/core/src/main/scala/filodb.core/metadata/Schemas.scala b/core/src/main/scala/filodb.core/metadata/Schemas.scala index f586cc5af..f486f620e 100644 --- a/core/src/main/scala/filodb.core/metadata/Schemas.scala +++ b/core/src/main/scala/filodb.core/metadata/Schemas.scala @@ -372,6 +372,8 @@ object Schemas extends StrictLogging { import Accumulation._ import Dataset._ + val TypeLabel = "_type_" + val _log = logger val rowKeyIDs = Seq(0) // First or timestamp column is always the row keys diff --git a/core/src/test/resources/application_test.conf b/core/src/test/resources/application_test.conf index 75132f930..a482034e6 100644 --- a/core/src/test/resources/application_test.conf +++ b/core/src/test/resources/application_test.conf @@ -109,7 +109,7 @@ filodb { index-faceting-enabled-shard-key-labels = true index-faceting-enabled-for-all-labels = true disable-index-caching = false - + type-field-indexing-enabled = true } tasks { diff --git a/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala b/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala index 663cb2b0c..f39368270 100644 --- a/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala +++ b/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala @@ -97,6 +97,17 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte val partNums7 = keyIndex.partIdsFromFilters(Seq(filter7), (start + end)/2, end + 1000 ) partNums7 should not equal debox.Buffer.empty[Int] + // tests to validate schema ID filter + val filter8 = Seq( ColumnFilter("Actor2Code", Equals("GOV".utf8)), + ColumnFilter("_type_", Equals("schemaID:46894".utf8))) + val partNums8 = keyIndex.partIdsFromFilters(filter8, start, end) + partNums8 shouldEqual debox.Buffer(7, 8, 9) + + val filter9 = Seq( ColumnFilter("Actor2Code", Equals("GOV".utf8)), + ColumnFilter("_type_", Equals("prom-counter".utf8))) + val partNums9 = keyIndex.partIdsFromFilters(filter9, start, end) + partNums9.length shouldEqual 0 + } it("should fetch part key records from filters correctly") { @@ -467,7 +478,7 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte keyIndex.refreshReadersBlocking() - keyIndex.indexNames(10).toList shouldEqual Seq("Actor2Code", "Actor2Name") + keyIndex.indexNames(10).toList shouldEqual Seq("_type_", "Actor2Code", "Actor2Name") keyIndex.indexValues("not_found").toSeq should equal (Nil) val infos = Seq("AFR", "CHN", "COP", "CVL", "EGYEDU").map(_.utf8).map(TermInfo(_, 1)) @@ -602,7 +613,7 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte val labelValues1 = index3.labelNamesEfficient(filters1, 0, Long.MaxValue) labelValues1.toSet shouldEqual (0 until 5).map(c => s"dynamicLabel$c").toSet ++ (0 until 10).map(c => s"infoLabel$c").toSet ++ - Set("_ns_", "_ws_", "_metric_") + Set("_ns_", "_ws_", "_metric_", "_type_") } it("should be able to fetch label values efficiently using facets") { diff --git a/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala b/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala index e5b007287..c9ca67e4d 100644 --- a/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala +++ b/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala @@ -66,7 +66,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte memStore.asInstanceOf[TimeSeriesMemStore].refreshIndexForTesting(dataset1.ref) memStore.numPartitions(dataset1.ref, 0) shouldEqual 10 - memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0))) + memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0), ("_type_",0))) memStore.latestOffset(dataset1.ref, 0) shouldEqual 0 val minSet = rawData.map(_(1).asInstanceOf[Double]).toSet @@ -202,7 +202,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte splits should have length (2) memStore.indexNames(dataset2.ref, 10).toSet should equal ( - Set(("n", 0), ("series", 0), ("n", 1), ("series", 1))) + Set(("n", 0), ("series", 0), ("n", 1), ("series", 1), ("_type_",0), ("_type_",1))) val filter = ColumnFilter("n", Filter.Equals("2".utf8)) val agg1 = memStore.scanRows(dataset2, Seq(1), FilteredPartitionScan(splits.head, Seq(filter))) @@ -465,7 +465,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte memStore.refreshIndexForTesting(dataset1.ref) memStore.numPartitions(dataset1.ref, 0) shouldEqual 10 - memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0))) + memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0), ("_type_",0))) memStore.truncate(dataset1.ref, numShards = 4) diff --git a/prometheus/src/main/scala/filodb/prometheus/ast/Vectors.scala b/prometheus/src/main/scala/filodb/prometheus/ast/Vectors.scala index 77c4bf171..473d13546 100644 --- a/prometheus/src/main/scala/filodb/prometheus/ast/Vectors.scala +++ b/prometheus/src/main/scala/filodb/prometheus/ast/Vectors.scala @@ -3,13 +3,14 @@ package filodb.prometheus.ast import scala.util.Try import filodb.core.{query, GlobalConfig} +import filodb.core.metadata.Schemas import filodb.core.query.{ColumnFilter, QueryUtils, RangeParams} import filodb.prometheus.parse.Parser import filodb.query._ object Vectors { val PromMetricLabel = "__name__" - val TypeLabel = "_type_" + val TypeLabel = Schemas.TypeLabel val BucketFilterLabel = "_bucket_" val conf = GlobalConfig.systemConfig val queryConfig = conf.getConfig("filodb.query") diff --git a/prometheus/src/main/scala/filodb/prometheus/query/PrometheusModel.scala b/prometheus/src/main/scala/filodb/prometheus/query/PrometheusModel.scala index 0e03d89ea..75ff9b3d7 100644 --- a/prometheus/src/main/scala/filodb/prometheus/query/PrometheusModel.scala +++ b/prometheus/src/main/scala/filodb/prometheus/query/PrometheusModel.scala @@ -4,8 +4,8 @@ import remote.RemoteStorage._ import filodb.core.GlobalConfig import filodb.core.binaryrecord2.{BinaryRecordRowReader, StringifyMapItemConsumer} +import filodb.core.metadata.{PartitionSchema, Schemas} import filodb.core.metadata.Column.ColumnType -import filodb.core.metadata.PartitionSchema import filodb.core.query.{QueryUtils, Result => _, _} import filodb.prometheus.parse.Parser.REGEX_MAX_LEN import filodb.query.{QueryResult => FiloQueryResult, _} @@ -230,7 +230,7 @@ object PrometheusModel { def makeVerboseLabels(rvk: RangeVectorKey): Map[String, String] = { Map("_shards_" -> rvk.sourceShards.mkString(","), "_partIds_" -> rvk.partIds.mkString(","), - "_type_" -> rvk.schemaNames.mkString(",")) + Schemas.TypeLabel -> rvk.schemaNames.mkString(",")) } def toPromErrorResponse(qe: filodb.query.QueryError): ErrorResponse = { diff --git a/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala b/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala index d8aced8cc..313f5a9ca 100644 --- a/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala +++ b/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala @@ -16,7 +16,7 @@ import filodb.core.binaryrecord2.BinaryRecordRowReader import filodb.core.memstore.ratelimit.CardinalityStore import filodb.core.memstore.{FixedMaxPartitionsEvictionPolicy, SomeData, TimeSeriesMemStore} import filodb.core.metadata.Schemas -import filodb.core.query._ +import filodb.core.query.{ColumnFilter, _} import filodb.core.store.{ChunkSource, InMemoryMetaStore, NullColumnStore} import filodb.memory.format.{SeqRowReader, ZeroCopyUTF8String} import filodb.query._ @@ -160,6 +160,52 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B result shouldEqual jobQueryResult1 } + it ("should read the job names from timeseriesindex matching the columnfilters with _type_ label") { + import ZeroCopyUTF8String._ + val filters = Seq(ColumnFilter("_metric_", Filter.Equals("http_req_total".utf8)), + ColumnFilter("_type_", Filter.Equals(Schemas.promCounter.name.utf8)), + ColumnFilter("job", Filter.Equals("myCoolService".utf8))) + + val leaves = (0 until shardPartKeyLabelValues.size).map{ ishard => + LabelValuesExec(QueryContext(), executeDispatcher, timeseriesDatasetMultipleShardKeys.ref, + ishard, filters, Seq("job", "unicode_tag"), now-5000, now) + } + + val execPlan = LabelValuesDistConcatExec(QueryContext(), executeDispatcher, leaves) + + val resp = execPlan.execute(memStore, querySession).runToFuture.futureValue + val result = (resp: @unchecked) match { + case QueryResult(id, _, response, _, _, _, _) => { + val rv = response(0) + rv.rows.size shouldEqual 1 + val record = rv.rows.next().asInstanceOf[BinaryRecordRowReader] + rv.asInstanceOf[SerializedRangeVector].schema.toStringPairs(record.recordBase, record.recordOffset) + } + } + result shouldEqual jobQueryResult1 + } + + it ("should read the job names from timeseriesindex matching the columnfilters with _type_ label empty result") { + import ZeroCopyUTF8String._ + val filters = Seq(ColumnFilter("_metric_", Filter.Equals("http_req_total".utf8)), + ColumnFilter("_type_", Filter.Equals(Schemas.promHistogram.name.utf8)), + ColumnFilter("job", Filter.Equals("myCoolService".utf8))) + + val leaves = (0 until shardPartKeyLabelValues.size).map { ishard => + LabelValuesExec(QueryContext(), executeDispatcher, timeseriesDatasetMultipleShardKeys.ref, + ishard, filters, Seq("job", "unicode_tag"), now - 5000, now) + } + + val execPlan = LabelValuesDistConcatExec(QueryContext(), executeDispatcher, leaves) + + val resp = execPlan.execute(memStore, querySession).runToFuture.futureValue + (resp: @unchecked) match { + case QueryResult(id, _, response, _, _, _, _) => { + response.isEmpty shouldEqual true + } + } + } + it("should not return any rows for wrong column filters") { import ZeroCopyUTF8String._ val filters = Seq (ColumnFilter("__name__", Filter.Equals("http_req_total1".utf8)), @@ -276,7 +322,7 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B } it ("should be able to query labels with filter") { - val expectedLabels = Set("job", "_metric_", "unicode_tag", "instance", "_ws_", "_ns_") + val expectedLabels = Set("job", "_metric_", "unicode_tag", "instance", "_ws_", "_ns_", "_type_") val filters = Seq (ColumnFilter("job", Filter.Equals("myCoolService".utf8))) val leaves = (0 until shardPartKeyLabelValues.size).map{ ishard => @@ -355,7 +401,8 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B "job" -> "1", "instance" -> "2", "_metric_" -> "1", - "_ws_" -> "1") + "_ws_" -> "1", + "_type_" -> "1") } }