Skip to content

Commit

Permalink
feat(core): Do not index any existing _type_ field since it is a rese…
Browse files Browse the repository at this point in the history
…rved field (#1842)

Addition of _type_ field to index is now configurable for each cluster, false by default for now.
Also, if the part-key already has the _type_ field we don't index that since it is a reserved field that we populate.
  • Loading branch information
vishramachandran authored and sherali42 committed Oct 22, 2024
1 parent 88c3e5d commit 61a5c1f
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 11 deletions.
3 changes: 3 additions & 0 deletions core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,9 @@ filodb {
# Whether caching on index is disabled underlying Lucene index uses LRUCache enabled by default, the flag lets us
#disable this feature
disable-index-caching = false

# Whether to add the _type_ label to all time series for the purpose of filtering
type-field-indexing-enabled = false
}

# for standalone worker cluster configuration, see akka-bootstrapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef,
private val deploymentPartitionName = filodbConfig.getString("deployment-partition-name")

private val downsampleStoreConfig = StoreConfig(filodbConfig.getConfig("downsampler.downsample-store-config"))
private val typeFieldIndexingEnabled = filodbConfig.getBoolean("memstore.type-field-indexing-enabled")

private val stats = new DownsampledTimeSeriesShardStats(rawDatasetRef, shardNum)

Expand All @@ -102,7 +103,7 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef,
private val partKeyIndex: PartKeyIndexDownsampled = new PartKeyLuceneIndex(indexDataset, schemas.part, false,
false, shardNum, indexTtlMs,
downsampleConfig.indexLocation.map(new java.io.File(_)),
indexMetadataStore
indexMetadataStore, addMetricTypeField = typeFieldIndexingEnabled
)

private val indexUpdatedHour = new AtomicLong(0)
Expand Down
32 changes: 24 additions & 8 deletions core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ class PartKeyLuceneIndex(ref: DatasetRef,
diskLocation: Option[File] = None,
val lifecycleManager: Option[IndexMetadataStore] = None,
useMemoryMappedImpl: Boolean = true,
disableIndexCaching: Boolean = false
disableIndexCaching: Boolean = false,
addMetricTypeField: Boolean = true
) extends StrictLogging with PartKeyIndexDownsampled {

import PartKeyLuceneIndex._
Expand Down Expand Up @@ -392,8 +393,8 @@ class PartKeyLuceneIndex(ref: DatasetRef,
val value = new String(valueBase.asInstanceOf[Array[Byte]],
unsafeOffsetToBytesRefOffset(valueOffset + 2), // add 2 to move past numBytes
UTF8StringMedium.numBytes(valueBase, valueOffset), StandardCharsets.UTF_8)
addIndexedField(key, value)
}
addIndexedField(key, value, clientData = true)
}
}

/**
Expand All @@ -409,7 +410,7 @@ class PartKeyLuceneIndex(ref: DatasetRef,
val numBytes = schema.binSchema.blobNumBytes(base, offset, pos)
val value = new String(base.asInstanceOf[Array[Byte]], strOffset.toInt - UnsafeUtils.arayOffset,
numBytes, StandardCharsets.UTF_8)
addIndexedField(colName.toString, value)
addIndexedField(colName.toString, value, clientData = true)
}
def getNamesValues(key: PartitionKey): Seq[(UTF8Str, UTF8Str)] = ??? // not used
}
Expand All @@ -425,7 +426,6 @@ class PartKeyLuceneIndex(ref: DatasetRef,
}
}.toArray


def getCurrentIndexState(): (IndexState.Value, Option[Long]) =
lifecycleManager.map(_.currentState(this.ref, this.shardNum)).getOrElse((IndexState.Empty, None))

Expand Down Expand Up @@ -619,8 +619,23 @@ class PartKeyLuceneIndex(ref: DatasetRef,
ret
}

private def addIndexedField(labelName: String, value: String): Unit = {
luceneDocument.get().addField(labelName, value)
/**
*
* @param clientData pass true if the field data has come from metric source, and false if internally setting the
* field data. This is used to determine if the type field data should be indexed or not.
*/
private def addIndexedField(labelName: String, value: String, clientData: Boolean): Unit = {
if (clientData && addMetricTypeField) {
// do not index any existing _type_ tag since this is reserved and should not be passed in by clients
if (labelName != Schemas.TypeLabel)
luceneDocument.get().addField(labelName, value)
else
logger.warn("Map column with name '_type_' is a reserved label. Not indexing it.")
// I would have liked to log the entire PK to debug, but it is not accessible from here.
// Ignoring for now, since the plan of record is to drop reserved labels at ingestion gateway.
} else {
luceneDocument.get().addField(labelName, value)
}
}

def addPartKey(partKeyOnHeapBytes: Array[Byte],
Expand Down Expand Up @@ -678,7 +693,8 @@ class PartKeyLuceneIndex(ref: DatasetRef,
createMultiColumnFacets(partKeyOnHeapBytes, partKeyBytesRefOffset)

val schemaName = Schemas.global.schemaName(RecordSchema.schemaID(partKeyOnHeapBytes, UnsafeUtils.arayOffset))
addIndexedField(Schemas.TypeLabel, schemaName)
if (addMetricTypeField)
addIndexedField(Schemas.TypeLabel, schemaName, clientData = false)

cforRange { 0 until numPartColumns } { i =>
indexers(i).fromPartKey(partKeyOnHeapBytes, bytesRefToUnsafeOffset(partKeyBytesRefOffset), partId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ class TimeSeriesShard(val ref: DatasetRef,
private val indexFacetingEnabledAllLabels = filodbConfig.getBoolean("memstore.index-faceting-enabled-for-all-labels")
private val numParallelFlushes = filodbConfig.getInt("memstore.flush-task-parallelism")
private val disableIndexCaching = filodbConfig.getBoolean("memstore.disable-index-caching")
private val typeFieldIndexingEnabled = filodbConfig.getBoolean("memstore.type-field-indexing-enabled")


/////// END CONFIGURATION FIELDS ///////////////////
Expand Down Expand Up @@ -313,7 +314,8 @@ class TimeSeriesShard(val ref: DatasetRef,
*/
private[memstore] final val partKeyIndex: PartKeyIndexRaw = new PartKeyLuceneIndex(ref, schemas.part,
indexFacetingEnabledAllLabels, indexFacetingEnabledShardKeyLabels, shardNum,
storeConfig.diskTTLSeconds * 1000, disableIndexCaching = disableIndexCaching)
storeConfig.diskTTLSeconds * 1000, disableIndexCaching = disableIndexCaching,
addMetricTypeField = typeFieldIndexingEnabled)

private val cardTracker: CardinalityTracker = initCardTracker()

Expand Down
2 changes: 1 addition & 1 deletion core/src/test/resources/application_test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ filodb {
index-faceting-enabled-shard-key-labels = true
index-faceting-enabled-for-all-labels = true
disable-index-caching = false

type-field-indexing-enabled = true
}

tasks {
Expand Down

0 comments on commit 61a5c1f

Please sign in to comment.