From 81cde60a28ccccac66d31de0fe76e041c58fdf24 Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Mon, 29 Jul 2024 13:51:09 -0700 Subject: [PATCH 1/2] feat(core): Now metadata queries support _type_ filter (#1819) Current behavior : Metadata queries do not support filtering based on metric type using _type_ filter New behavior : Added support for that feature. Note that only new documents of persisted downsample index will have the type field. Index needs to be rebuilt if full support is needed. Can be skipped if metadata queries don't hit downsample index. --- .../queryplanner/SingleClusterPlanner.scala | 3 +- .../NodeCoordinatorActorSpec.scala | 4 +- .../binaryrecord2/RecordSchema.scala | 2 +- .../DownsampledTimeSeriesShard.scala | 3 +- .../memstore/PartKeyLuceneIndex.scala | 7 ++- .../memstore/TimeSeriesShard.scala | 2 +- .../scala/filodb.core/metadata/Schemas.scala | 2 + .../memstore/PartKeyLuceneIndexSpec.scala | 15 +++++- .../memstore/TimeSeriesMemStoreSpec.scala | 6 +-- .../scala/filodb/prometheus/ast/Vectors.scala | 3 +- .../prometheus/query/PrometheusModel.scala | 4 +- .../filodb/query/exec/MetadataExecSpec.scala | 53 +++++++++++++++++-- 12 files changed, 84 insertions(+), 20 deletions(-) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala index 86c3a5ada..d457a2710 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala @@ -1028,8 +1028,7 @@ class SingleClusterPlanner(val dataset: Dataset, private def materializeSeriesKeysByFilters(qContext: QueryContext, lp: SeriesKeysByFilters, forceInProcess: Boolean): PlanResult = { - // NOTE: _type_ filter support currently isn't there in series keys queries - val (renamedFilters, _) = extractSchemaFilter(renameMetricFilter(lp.filters)) + val renamedFilters = renameMetricFilter(lp.filters) val shardsToHit = if (canGetShardsFromFilters(renamedFilters, qContext)) { shardsFromFilters(renamedFilters, qContext, lp.startMs, lp.endMs) diff --git a/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala b/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala index 0f7e72a24..4131753b6 100644 --- a/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala @@ -329,7 +329,7 @@ class NodeCoordinatorActorSpec extends ActorTest(NodeCoordinatorActorSpec.getNew memStore.refreshIndexForTesting(dataset1.ref) probe.send(coordinatorActor, GetIndexNames(ref)) - probe.expectMsg(Seq("series")) + probe.expectMsg(Seq("series", "_type_")) probe.send(coordinatorActor, GetIndexValues(ref, "series", 0, limit=4)) probe.expectMsg(Seq(("Series 0", 1), ("Series 1", 1), ("Series 2", 1), ("Series 3", 1))) @@ -343,7 +343,7 @@ class NodeCoordinatorActorSpec extends ActorTest(NodeCoordinatorActorSpec.getNew memStore.refreshIndexForTesting(dataset1.ref) probe.send(coordinatorActor, GetIndexNames(ref)) - probe.expectMsg(Seq("series")) + probe.expectMsg(Seq("series", "_type_")) //actor should restart and serve queries again probe.send(coordinatorActor, GetIndexValues(ref, "series", 0, limit=4)) diff --git a/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala b/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala index 01a1252f7..0ab22c3e0 100644 --- a/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala +++ b/core/src/main/scala/filodb.core/binaryrecord2/RecordSchema.scala @@ -279,7 +279,7 @@ final class RecordSchema(val columns: Seq[ColumnInfo], result ++= consumer.stringPairs case (BinaryRecordColumn, i) => result ++= brSchema(i).toStringPairs(blobBase(base, offset, i), blobOffset(base, offset, i)) - result += ("_type_" -> + result += (Schemas.TypeLabel -> Schemas.global.schemaName( RecordSchema.schemaID(blobBase(base, offset, i), blobOffset(base, offset, i)))) diff --git a/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala b/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala index 599c2f51f..0ecf88fb4 100644 --- a/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala @@ -174,7 +174,8 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef, schemas.part.binSchema.toStringPairs(partKey.base, partKey.offset).map(pair => { pair._1.utf8 -> pair._2.utf8 }).toMap ++ - Map("_type_".utf8 -> Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8) + Map(Schemas.TypeLabel.utf8 -> + Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8) } } diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala index 2cade7d97..84a251ad5 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala @@ -36,10 +36,10 @@ import spire.syntax.cfor._ import filodb.core.{concurrentCache, DatasetRef} import filodb.core.Types.PartitionKey -import filodb.core.binaryrecord2.MapItemConsumer +import filodb.core.binaryrecord2.{MapItemConsumer, RecordSchema} import filodb.core.memstore.ratelimit.CardinalityTracker +import filodb.core.metadata.{PartitionSchema, Schemas} import filodb.core.metadata.Column.ColumnType.{MapColumn, StringColumn} -import filodb.core.metadata.PartitionSchema import filodb.core.query.{ColumnFilter, Filter, QueryUtils} import filodb.core.query.Filter._ import filodb.memory.{BinaryRegionLarge, UTF8StringMedium, UTF8StringShort} @@ -677,6 +677,9 @@ class PartKeyLuceneIndex(ref: DatasetRef, // If configured and enabled, Multi-column facets will be created on "partition-schema" columns createMultiColumnFacets(partKeyOnHeapBytes, partKeyBytesRefOffset) + val schemaName = Schemas.global.schemaName(RecordSchema.schemaID(partKeyOnHeapBytes, UnsafeUtils.arayOffset)) + addIndexedField(Schemas.TypeLabel, schemaName) + cforRange { 0 until numPartColumns } { i => indexers(i).fromPartKey(partKeyOnHeapBytes, bytesRefToUnsafeOffset(partKeyBytesRefOffset), partId) } diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala index ab1a73a27..8b8e3593e 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala @@ -1853,7 +1853,7 @@ class TimeSeriesShard(val ref: DatasetRef, schemas.part.binSchema.toStringPairs(partKey.base, partKey.offset).map(pair => { pair._1.utf8 -> pair._2.utf8 }).toMap ++ - Map("_type_".utf8 -> Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8) + Map(Schemas.TypeLabel.utf8 -> Schemas.global.schemaName(RecordSchema.schemaID(partKey.base, partKey.offset)).utf8) } /** diff --git a/core/src/main/scala/filodb.core/metadata/Schemas.scala b/core/src/main/scala/filodb.core/metadata/Schemas.scala index f586cc5af..f486f620e 100644 --- a/core/src/main/scala/filodb.core/metadata/Schemas.scala +++ b/core/src/main/scala/filodb.core/metadata/Schemas.scala @@ -372,6 +372,8 @@ object Schemas extends StrictLogging { import Accumulation._ import Dataset._ + val TypeLabel = "_type_" + val _log = logger val rowKeyIDs = Seq(0) // First or timestamp column is always the row keys diff --git a/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala b/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala index 663cb2b0c..f39368270 100644 --- a/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala +++ b/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala @@ -97,6 +97,17 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte val partNums7 = keyIndex.partIdsFromFilters(Seq(filter7), (start + end)/2, end + 1000 ) partNums7 should not equal debox.Buffer.empty[Int] + // tests to validate schema ID filter + val filter8 = Seq( ColumnFilter("Actor2Code", Equals("GOV".utf8)), + ColumnFilter("_type_", Equals("schemaID:46894".utf8))) + val partNums8 = keyIndex.partIdsFromFilters(filter8, start, end) + partNums8 shouldEqual debox.Buffer(7, 8, 9) + + val filter9 = Seq( ColumnFilter("Actor2Code", Equals("GOV".utf8)), + ColumnFilter("_type_", Equals("prom-counter".utf8))) + val partNums9 = keyIndex.partIdsFromFilters(filter9, start, end) + partNums9.length shouldEqual 0 + } it("should fetch part key records from filters correctly") { @@ -467,7 +478,7 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte keyIndex.refreshReadersBlocking() - keyIndex.indexNames(10).toList shouldEqual Seq("Actor2Code", "Actor2Name") + keyIndex.indexNames(10).toList shouldEqual Seq("_type_", "Actor2Code", "Actor2Name") keyIndex.indexValues("not_found").toSeq should equal (Nil) val infos = Seq("AFR", "CHN", "COP", "CVL", "EGYEDU").map(_.utf8).map(TermInfo(_, 1)) @@ -602,7 +613,7 @@ class PartKeyLuceneIndexSpec extends AnyFunSpec with Matchers with BeforeAndAfte val labelValues1 = index3.labelNamesEfficient(filters1, 0, Long.MaxValue) labelValues1.toSet shouldEqual (0 until 5).map(c => s"dynamicLabel$c").toSet ++ (0 until 10).map(c => s"infoLabel$c").toSet ++ - Set("_ns_", "_ws_", "_metric_") + Set("_ns_", "_ws_", "_metric_", "_type_") } it("should be able to fetch label values efficiently using facets") { diff --git a/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala b/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala index e5b007287..c9ca67e4d 100644 --- a/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala +++ b/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala @@ -66,7 +66,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte memStore.asInstanceOf[TimeSeriesMemStore].refreshIndexForTesting(dataset1.ref) memStore.numPartitions(dataset1.ref, 0) shouldEqual 10 - memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0))) + memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0), ("_type_",0))) memStore.latestOffset(dataset1.ref, 0) shouldEqual 0 val minSet = rawData.map(_(1).asInstanceOf[Double]).toSet @@ -202,7 +202,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte splits should have length (2) memStore.indexNames(dataset2.ref, 10).toSet should equal ( - Set(("n", 0), ("series", 0), ("n", 1), ("series", 1))) + Set(("n", 0), ("series", 0), ("n", 1), ("series", 1), ("_type_",0), ("_type_",1))) val filter = ColumnFilter("n", Filter.Equals("2".utf8)) val agg1 = memStore.scanRows(dataset2, Seq(1), FilteredPartitionScan(splits.head, Seq(filter))) @@ -465,7 +465,7 @@ class TimeSeriesMemStoreSpec extends AnyFunSpec with Matchers with BeforeAndAfte memStore.refreshIndexForTesting(dataset1.ref) memStore.numPartitions(dataset1.ref, 0) shouldEqual 10 - memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0))) + memStore.indexNames(dataset1.ref, 10).toSeq should equal (Seq(("series", 0), ("_type_",0))) memStore.truncate(dataset1.ref, numShards = 4) diff --git a/prometheus/src/main/scala/filodb/prometheus/ast/Vectors.scala b/prometheus/src/main/scala/filodb/prometheus/ast/Vectors.scala index 77c4bf171..473d13546 100644 --- a/prometheus/src/main/scala/filodb/prometheus/ast/Vectors.scala +++ b/prometheus/src/main/scala/filodb/prometheus/ast/Vectors.scala @@ -3,13 +3,14 @@ package filodb.prometheus.ast import scala.util.Try import filodb.core.{query, GlobalConfig} +import filodb.core.metadata.Schemas import filodb.core.query.{ColumnFilter, QueryUtils, RangeParams} import filodb.prometheus.parse.Parser import filodb.query._ object Vectors { val PromMetricLabel = "__name__" - val TypeLabel = "_type_" + val TypeLabel = Schemas.TypeLabel val BucketFilterLabel = "_bucket_" val conf = GlobalConfig.systemConfig val queryConfig = conf.getConfig("filodb.query") diff --git a/prometheus/src/main/scala/filodb/prometheus/query/PrometheusModel.scala b/prometheus/src/main/scala/filodb/prometheus/query/PrometheusModel.scala index 0e03d89ea..75ff9b3d7 100644 --- a/prometheus/src/main/scala/filodb/prometheus/query/PrometheusModel.scala +++ b/prometheus/src/main/scala/filodb/prometheus/query/PrometheusModel.scala @@ -4,8 +4,8 @@ import remote.RemoteStorage._ import filodb.core.GlobalConfig import filodb.core.binaryrecord2.{BinaryRecordRowReader, StringifyMapItemConsumer} +import filodb.core.metadata.{PartitionSchema, Schemas} import filodb.core.metadata.Column.ColumnType -import filodb.core.metadata.PartitionSchema import filodb.core.query.{QueryUtils, Result => _, _} import filodb.prometheus.parse.Parser.REGEX_MAX_LEN import filodb.query.{QueryResult => FiloQueryResult, _} @@ -230,7 +230,7 @@ object PrometheusModel { def makeVerboseLabels(rvk: RangeVectorKey): Map[String, String] = { Map("_shards_" -> rvk.sourceShards.mkString(","), "_partIds_" -> rvk.partIds.mkString(","), - "_type_" -> rvk.schemaNames.mkString(",")) + Schemas.TypeLabel -> rvk.schemaNames.mkString(",")) } def toPromErrorResponse(qe: filodb.query.QueryError): ErrorResponse = { diff --git a/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala b/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala index d8aced8cc..313f5a9ca 100644 --- a/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala +++ b/query/src/test/scala/filodb/query/exec/MetadataExecSpec.scala @@ -16,7 +16,7 @@ import filodb.core.binaryrecord2.BinaryRecordRowReader import filodb.core.memstore.ratelimit.CardinalityStore import filodb.core.memstore.{FixedMaxPartitionsEvictionPolicy, SomeData, TimeSeriesMemStore} import filodb.core.metadata.Schemas -import filodb.core.query._ +import filodb.core.query.{ColumnFilter, _} import filodb.core.store.{ChunkSource, InMemoryMetaStore, NullColumnStore} import filodb.memory.format.{SeqRowReader, ZeroCopyUTF8String} import filodb.query._ @@ -160,6 +160,52 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B result shouldEqual jobQueryResult1 } + it ("should read the job names from timeseriesindex matching the columnfilters with _type_ label") { + import ZeroCopyUTF8String._ + val filters = Seq(ColumnFilter("_metric_", Filter.Equals("http_req_total".utf8)), + ColumnFilter("_type_", Filter.Equals(Schemas.promCounter.name.utf8)), + ColumnFilter("job", Filter.Equals("myCoolService".utf8))) + + val leaves = (0 until shardPartKeyLabelValues.size).map{ ishard => + LabelValuesExec(QueryContext(), executeDispatcher, timeseriesDatasetMultipleShardKeys.ref, + ishard, filters, Seq("job", "unicode_tag"), now-5000, now) + } + + val execPlan = LabelValuesDistConcatExec(QueryContext(), executeDispatcher, leaves) + + val resp = execPlan.execute(memStore, querySession).runToFuture.futureValue + val result = (resp: @unchecked) match { + case QueryResult(id, _, response, _, _, _, _) => { + val rv = response(0) + rv.rows.size shouldEqual 1 + val record = rv.rows.next().asInstanceOf[BinaryRecordRowReader] + rv.asInstanceOf[SerializedRangeVector].schema.toStringPairs(record.recordBase, record.recordOffset) + } + } + result shouldEqual jobQueryResult1 + } + + it ("should read the job names from timeseriesindex matching the columnfilters with _type_ label empty result") { + import ZeroCopyUTF8String._ + val filters = Seq(ColumnFilter("_metric_", Filter.Equals("http_req_total".utf8)), + ColumnFilter("_type_", Filter.Equals(Schemas.promHistogram.name.utf8)), + ColumnFilter("job", Filter.Equals("myCoolService".utf8))) + + val leaves = (0 until shardPartKeyLabelValues.size).map { ishard => + LabelValuesExec(QueryContext(), executeDispatcher, timeseriesDatasetMultipleShardKeys.ref, + ishard, filters, Seq("job", "unicode_tag"), now - 5000, now) + } + + val execPlan = LabelValuesDistConcatExec(QueryContext(), executeDispatcher, leaves) + + val resp = execPlan.execute(memStore, querySession).runToFuture.futureValue + (resp: @unchecked) match { + case QueryResult(id, _, response, _, _, _, _) => { + response.isEmpty shouldEqual true + } + } + } + it("should not return any rows for wrong column filters") { import ZeroCopyUTF8String._ val filters = Seq (ColumnFilter("__name__", Filter.Equals("http_req_total1".utf8)), @@ -276,7 +322,7 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B } it ("should be able to query labels with filter") { - val expectedLabels = Set("job", "_metric_", "unicode_tag", "instance", "_ws_", "_ns_") + val expectedLabels = Set("job", "_metric_", "unicode_tag", "instance", "_ws_", "_ns_", "_type_") val filters = Seq (ColumnFilter("job", Filter.Equals("myCoolService".utf8))) val leaves = (0 until shardPartKeyLabelValues.size).map{ ishard => @@ -355,7 +401,8 @@ class MetadataExecSpec extends AnyFunSpec with Matchers with ScalaFutures with B "job" -> "1", "instance" -> "2", "_metric_" -> "1", - "_ws_" -> "1") + "_ws_" -> "1", + "_type_" -> "1") } } From f7182b928be476cd2bc7b04010aceeb39df981de Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Thu, 5 Sep 2024 12:32:16 -0700 Subject: [PATCH 2/2] feat(core): Do not index any existing _type_ field since it is a reserved field (#1842) Addition of _type_ field to index is now configurable for each cluster, false by default for now. Also, if the part-key already has the _type_ field we don't index that since it is a reserved field that we populate. --- core/src/main/resources/filodb-defaults.conf | 3 ++ .../DownsampledTimeSeriesShard.scala | 3 +- .../memstore/PartKeyLuceneIndex.scala | 32 ++++++++++++++----- .../memstore/TimeSeriesShard.scala | 4 ++- core/src/test/resources/application_test.conf | 2 +- 5 files changed, 33 insertions(+), 11 deletions(-) diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf index eac1ffcf0..493e77712 100644 --- a/core/src/main/resources/filodb-defaults.conf +++ b/core/src/main/resources/filodb-defaults.conf @@ -803,6 +803,9 @@ filodb { # Whether caching on index is disabled underlying Lucene index uses LRUCache enabled by default, the flag lets us #disable this feature disable-index-caching = false + + # Whether to add the _type_ label to all time series for the purpose of filtering + type-field-indexing-enabled = false } # for standalone worker cluster configuration, see akka-bootstrapper diff --git a/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala b/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala index 0ecf88fb4..54d069c38 100644 --- a/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/downsample/DownsampledTimeSeriesShard.scala @@ -83,6 +83,7 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef, private val deploymentPartitionName = filodbConfig.getString("deployment-partition-name") private val downsampleStoreConfig = StoreConfig(filodbConfig.getConfig("downsampler.downsample-store-config")) + private val typeFieldIndexingEnabled = filodbConfig.getBoolean("memstore.type-field-indexing-enabled") private val stats = new DownsampledTimeSeriesShardStats(rawDatasetRef, shardNum) @@ -102,7 +103,7 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef, private val partKeyIndex: PartKeyIndexDownsampled = new PartKeyLuceneIndex(indexDataset, schemas.part, false, false, shardNum, indexTtlMs, downsampleConfig.indexLocation.map(new java.io.File(_)), - indexMetadataStore + indexMetadataStore, addMetricTypeField = typeFieldIndexingEnabled ) private val indexUpdatedHour = new AtomicLong(0) diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala index 84a251ad5..1cacce1d3 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala @@ -133,7 +133,8 @@ class PartKeyLuceneIndex(ref: DatasetRef, diskLocation: Option[File] = None, val lifecycleManager: Option[IndexMetadataStore] = None, useMemoryMappedImpl: Boolean = true, - disableIndexCaching: Boolean = false + disableIndexCaching: Boolean = false, + addMetricTypeField: Boolean = true ) extends StrictLogging with PartKeyIndexDownsampled { import PartKeyLuceneIndex._ @@ -392,8 +393,8 @@ class PartKeyLuceneIndex(ref: DatasetRef, val value = new String(valueBase.asInstanceOf[Array[Byte]], unsafeOffsetToBytesRefOffset(valueOffset + 2), // add 2 to move past numBytes UTF8StringMedium.numBytes(valueBase, valueOffset), StandardCharsets.UTF_8) - addIndexedField(key, value) - } + addIndexedField(key, value, clientData = true) + } } /** @@ -409,7 +410,7 @@ class PartKeyLuceneIndex(ref: DatasetRef, val numBytes = schema.binSchema.blobNumBytes(base, offset, pos) val value = new String(base.asInstanceOf[Array[Byte]], strOffset.toInt - UnsafeUtils.arayOffset, numBytes, StandardCharsets.UTF_8) - addIndexedField(colName.toString, value) + addIndexedField(colName.toString, value, clientData = true) } def getNamesValues(key: PartitionKey): Seq[(UTF8Str, UTF8Str)] = ??? // not used } @@ -425,7 +426,6 @@ class PartKeyLuceneIndex(ref: DatasetRef, } }.toArray - def getCurrentIndexState(): (IndexState.Value, Option[Long]) = lifecycleManager.map(_.currentState(this.ref, this.shardNum)).getOrElse((IndexState.Empty, None)) @@ -619,8 +619,23 @@ class PartKeyLuceneIndex(ref: DatasetRef, ret } - private def addIndexedField(labelName: String, value: String): Unit = { - luceneDocument.get().addField(labelName, value) + /** + * + * @param clientData pass true if the field data has come from metric source, and false if internally setting the + * field data. This is used to determine if the type field data should be indexed or not. + */ + private def addIndexedField(labelName: String, value: String, clientData: Boolean): Unit = { + if (clientData && addMetricTypeField) { + // do not index any existing _type_ tag since this is reserved and should not be passed in by clients + if (labelName != Schemas.TypeLabel) + luceneDocument.get().addField(labelName, value) + else + logger.warn("Map column with name '_type_' is a reserved label. Not indexing it.") + // I would have liked to log the entire PK to debug, but it is not accessible from here. + // Ignoring for now, since the plan of record is to drop reserved labels at ingestion gateway. + } else { + luceneDocument.get().addField(labelName, value) + } } def addPartKey(partKeyOnHeapBytes: Array[Byte], @@ -678,7 +693,8 @@ class PartKeyLuceneIndex(ref: DatasetRef, createMultiColumnFacets(partKeyOnHeapBytes, partKeyBytesRefOffset) val schemaName = Schemas.global.schemaName(RecordSchema.schemaID(partKeyOnHeapBytes, UnsafeUtils.arayOffset)) - addIndexedField(Schemas.TypeLabel, schemaName) + if (addMetricTypeField) + addIndexedField(Schemas.TypeLabel, schemaName, clientData = false) cforRange { 0 until numPartColumns } { i => indexers(i).fromPartKey(partKeyOnHeapBytes, bytesRefToUnsafeOffset(partKeyBytesRefOffset), partId) diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala index 8b8e3593e..49b465cfa 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala @@ -285,6 +285,7 @@ class TimeSeriesShard(val ref: DatasetRef, private val indexFacetingEnabledAllLabels = filodbConfig.getBoolean("memstore.index-faceting-enabled-for-all-labels") private val numParallelFlushes = filodbConfig.getInt("memstore.flush-task-parallelism") private val disableIndexCaching = filodbConfig.getBoolean("memstore.disable-index-caching") + private val typeFieldIndexingEnabled = filodbConfig.getBoolean("memstore.type-field-indexing-enabled") /////// END CONFIGURATION FIELDS /////////////////// @@ -313,7 +314,8 @@ class TimeSeriesShard(val ref: DatasetRef, */ private[memstore] final val partKeyIndex: PartKeyIndexRaw = new PartKeyLuceneIndex(ref, schemas.part, indexFacetingEnabledAllLabels, indexFacetingEnabledShardKeyLabels, shardNum, - storeConfig.diskTTLSeconds * 1000, disableIndexCaching = disableIndexCaching) + storeConfig.diskTTLSeconds * 1000, disableIndexCaching = disableIndexCaching, + addMetricTypeField = typeFieldIndexingEnabled) private val cardTracker: CardinalityTracker = initCardTracker() diff --git a/core/src/test/resources/application_test.conf b/core/src/test/resources/application_test.conf index 75132f930..a482034e6 100644 --- a/core/src/test/resources/application_test.conf +++ b/core/src/test/resources/application_test.conf @@ -109,7 +109,7 @@ filodb { index-faceting-enabled-shard-key-labels = true index-faceting-enabled-for-all-labels = true disable-index-caching = false - + type-field-indexing-enabled = true } tasks {