Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Remove FileWatcher from KNN (#2182) #2225

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* Add support to build vector data structures greedily and perform exact search when there are no engine files [#1942](https://github.com/opensearch-project/k-NN/issues/1942)
* Add CompressionLevel Calculation for PQ [#2200](https://github.com/opensearch-project/k-NN/pull/2200)
* Introduce a loading layer in native engine [#2185](https://github.com/opensearch-project/k-NN/pull/2185)
* Remove FSDirectory dependency from native engine constructing side and deprecated FileWatcher [#2182](https://github.com/opensearch-project/k-NN/pull/2182)
### Bug Fixes
* Add DocValuesProducers for releasing memory when close index [#1946](https://github.com/opensearch-project/k-NN/pull/1946)
* KNN80DocValues should only be considered for BinaryDocValues fields [#2147](https://github.com/opensearch-project/k-NN/pull/2147)
Expand Down
51 changes: 28 additions & 23 deletions src/main/java/org/opensearch/knn/index/KNNIndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.knn.common.FieldInfoExtractor;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.engine.qframe.QuantizationConfig;
import org.opensearch.knn.index.mapper.KNNVectorFieldMapper;
import org.opensearch.knn.index.memory.NativeMemoryAllocation;
Expand All @@ -29,9 +30,7 @@
import org.opensearch.knn.index.engine.KNNEngine;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -94,14 +93,18 @@ public void warmup() throws IOException {
try (Engine.Searcher searcher = indexShard.acquireSearcher("knn-warmup")) {
getAllEngineFileContexts(searcher.getIndexReader()).forEach((engineFileContext) -> {
try {
final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(
engineFileContext.vectorFileName,
engineFileContext.segmentInfo
);
nativeMemoryCacheManager.get(
new NativeMemoryEntryContext.IndexEntryContext(
directory,
engineFileContext.getIndexPath(),
cacheKey,
NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance(),
getParametersAtLoading(
engineFileContext.getSpaceType(),
KNNEngine.getEngineNameFromPath(engineFileContext.getIndexPath()),
KNNEngine.getEngineNameFromPath(engineFileContext.getVectorFileName()),
getIndexName(),
engineFileContext.getVectorDataType()
),
Expand Down Expand Up @@ -133,9 +136,13 @@ public void clearCache() {
indexAllocation.writeLock();
log.info("[KNN] Evicting index from cache: [{}]", indexName);
try (Engine.Searcher searcher = indexShard.acquireSearcher(INDEX_SHARD_CLEAR_CACHE_SEARCHER)) {
getAllEngineFileContexts(searcher.getIndexReader()).forEach(
(engineFileContext) -> nativeMemoryCacheManager.invalidate(engineFileContext.getIndexPath())
);
getAllEngineFileContexts(searcher.getIndexReader()).forEach((engineFileContext) -> {
final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(
engineFileContext.vectorFileName,
engineFileContext.segmentInfo
);
nativeMemoryCacheManager.invalidate(cacheKey);
});
} catch (IOException ex) {
log.error("[KNN] Failed to evict index from cache: [{}]", indexName, ex);
throw new RuntimeException(ex);
Expand Down Expand Up @@ -166,7 +173,6 @@ List<EngineFileContext> getEngineFileContexts(IndexReader indexReader, KNNEngine

for (LeafReaderContext leafReaderContext : indexReader.leaves()) {
SegmentReader reader = Lucene.segmentReader(leafReaderContext.reader());
Path shardPath = ((FSDirectory) FilterDirectory.unwrap(reader.directory())).getDirectory();
String fileExtension = reader.getSegmentInfo().info.getUseCompoundFile()
? knnEngine.getCompoundExtension()
: knnEngine.getExtension();
Expand All @@ -180,11 +186,9 @@ List<EngineFileContext> getEngineFileContexts(IndexReader indexReader, KNNEngine
String modelId = fieldInfo.attributes().getOrDefault(MODEL_ID, null);
engineFiles.addAll(
getEngineFileContexts(
reader.getSegmentInfo().files(),
reader.getSegmentInfo().info.name,
reader.getSegmentInfo(),
fieldInfo.name,
fileExtension,
shardPath,
spaceType,
modelId,
FieldInfoExtractor.extractQuantizationConfig(fieldInfo) == QuantizationConfig.EMPTY
Expand All @@ -202,22 +206,22 @@ List<EngineFileContext> getEngineFileContexts(IndexReader indexReader, KNNEngine

@VisibleForTesting
List<EngineFileContext> getEngineFileContexts(
Collection<String> files,
String segmentName,
SegmentCommitInfo segmentCommitInfo,
String fieldName,
String fileExtension,
Path shardPath,
SpaceType spaceType,
String modelId,
VectorDataType vectorDataType
) {
String prefix = buildEngineFilePrefix(segmentName);
String suffix = buildEngineFileSuffix(fieldName, fileExtension);
return files.stream()
) throws IOException {
// Ex: 0_
final String prefix = buildEngineFilePrefix(segmentCommitInfo.info.name);
// Ex: _my_field.faiss
final String suffix = buildEngineFileSuffix(fieldName, fileExtension);
return segmentCommitInfo.files()
.stream()
.filter(fileName -> fileName.startsWith(prefix))
.filter(fileName -> fileName.endsWith(suffix))
.map(fileName -> shardPath.resolve(fileName).toString())
.map(fileName -> new EngineFileContext(spaceType, modelId, fileName, vectorDataType))
.map(vectorFileName -> new EngineFileContext(spaceType, modelId, vectorFileName, vectorDataType, segmentCommitInfo.info))
.collect(Collectors.toList());
}

Expand All @@ -227,7 +231,8 @@ List<EngineFileContext> getEngineFileContexts(
static class EngineFileContext {
private final SpaceType spaceType;
private final String modelId;
private final String indexPath;
private final String vectorFileName;
private final VectorDataType vectorDataType;
private final SegmentInfo segmentInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,82 +11,34 @@

package org.opensearch.knn.index.codec.KNN80Codec;

import lombok.NonNull;
import lombok.extern.log4j.Log4j2;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.FieldInfo;

import java.io.IOException;

import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.common.io.PathUtils;
import org.opensearch.knn.common.FieldInfoExtractor;
import org.opensearch.knn.index.codec.util.KNNCodecUtil;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.opensearch.knn.common.KNNConstants.MODEL_ID;
import static org.opensearch.knn.index.mapper.KNNVectorFieldMapper.KNN_FIELD;

@Log4j2
public class KNN80DocValuesProducer extends DocValuesProducer {

private final SegmentReadState state;
private final DocValuesProducer delegate;
private final NativeMemoryCacheManager nativeMemoryCacheManager;
private final Map<String, String> indexPathMap = new HashMap();
private List<String> cacheKeys;

public KNN80DocValuesProducer(DocValuesProducer delegate, SegmentReadState state) {
this.delegate = delegate;
this.state = state;
this.nativeMemoryCacheManager = NativeMemoryCacheManager.getInstance();

Directory directory = state.directory;
// directory would be CompoundDirectory, we need get directory firstly and then unwrap
if (state.directory instanceof KNN80CompoundDirectory) {
directory = ((KNN80CompoundDirectory) state.directory).getDir();
}

Directory dir = FilterDirectory.unwrap(directory);
if (!(dir instanceof FSDirectory)) {
log.warn("{} can not casting to FSDirectory", directory);
return;
}
String directoryPath = ((FSDirectory) dir).getDirectory().toString();
for (FieldInfo field : state.fieldInfos) {
if (!field.attributes().containsKey(KNN_FIELD)) {
continue;
}
// Only segments that contains BinaryDocValues and doesn't have vector values should be considered.
// By default, we don't create BinaryDocValues for knn field anymore. However, users can set doc_values = true
// to create binary doc values explicitly like any other field. Hence, we only want to include fields
// where approximate search is possible only by BinaryDocValues.
if (field.getDocValuesType() != DocValuesType.BINARY || field.hasVectorValues() == true) {
continue;
}
// Only Native Engine put into indexPathMap
KNNEngine knnEngine = getNativeKNNEngine(field);
if (knnEngine == null) {
continue;
}
List<String> engineFiles = KNNCodecUtil.getEngineFiles(knnEngine.getExtension(), field.name, state.segmentInfo);
Path indexPath = PathUtils.get(directoryPath, engineFiles.get(0));
indexPathMap.putIfAbsent(field.getName(), indexPath.toString());

}
this.cacheKeys = getVectorCacheKeysFromSegmentReaderState(state);
}

@Override
Expand Down Expand Up @@ -121,32 +73,35 @@ public void checkIntegrity() throws IOException {

@Override
public void close() throws IOException {
for (String path : indexPathMap.values()) {
nativeMemoryCacheManager.invalidate(path);
}
final NativeMemoryCacheManager nativeMemoryCacheManager = NativeMemoryCacheManager.getInstance();
cacheKeys.forEach(nativeMemoryCacheManager::invalidate);
delegate.close();
}

public final List<String> getOpenedIndexPath() {
return new ArrayList<>(indexPathMap.values());
public final List<String> getCacheKeys() {
return new ArrayList<>(cacheKeys);
}

/**
* Get KNNEngine From FieldInfo
*
* @param field which field we need produce from engine
* @return if and only if Native Engine we return specific engine, else return null
*/
private KNNEngine getNativeKNNEngine(@NonNull FieldInfo field) {

final String modelId = field.attributes().get(MODEL_ID);
if (modelId != null) {
return null;
}
KNNEngine engine = FieldInfoExtractor.extractKNNEngine(field);
if (KNNEngine.getEnginesThatCreateCustomSegmentFiles().contains(engine)) {
return engine;
private static List<String> getVectorCacheKeysFromSegmentReaderState(SegmentReadState segmentReadState) {
final List<String> cacheKeys = new ArrayList<>();

for (FieldInfo field : segmentReadState.fieldInfos) {
// Only segments that contains BinaryDocValues and doesn't have vector values should be considered.
// By default, we don't create BinaryDocValues for knn field anymore. However, users can set doc_values = true
// to create binary doc values explicitly like any other field. Hence, we only want to include fields
// where approximate search is possible only by BinaryDocValues.
if (field.getDocValuesType() != DocValuesType.BINARY || field.hasVectorValues()) {
continue;
}

final String vectorIndexFileName = KNNCodecUtil.getNativeEngineFileFromFieldInfo(field, segmentReadState.segmentInfo);
if (vectorIndexFileName == null) {
continue;
}
final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(vectorIndexFileName, segmentReadState.segmentInfo);
cacheKeys.add(cacheKey);
}
return null;

return cacheKeys;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOUtils;
import org.opensearch.common.UUIDs;
import org.opensearch.knn.index.codec.util.KNNCodecUtil;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
import org.opensearch.knn.index.quantizationservice.QuantizationService;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationState;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateReadConfig;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -40,12 +45,14 @@
public class NativeEngines990KnnVectorsReader extends KnnVectorsReader {

private final FlatVectorsReader flatVectorsReader;
private final SegmentReadState segmentReadState;
private Map<String, String> quantizationStateCacheKeyPerField;
private SegmentReadState segmentReadState;
private final List<String> cacheKeys;

public NativeEngines990KnnVectorsReader(final SegmentReadState state, final FlatVectorsReader flatVectorsReader) throws IOException {
this.segmentReadState = state;
public NativeEngines990KnnVectorsReader(final SegmentReadState state, final FlatVectorsReader flatVectorsReader) {
this.flatVectorsReader = flatVectorsReader;
this.segmentReadState = state;
this.cacheKeys = getVectorCacheKeysFromSegmentReaderState(state);
loadCacheKeyMap();
}

Expand Down Expand Up @@ -176,10 +183,18 @@ public void search(String field, byte[] target, KnnCollector knnCollector, Bits
*/
@Override
public void close() throws IOException {
// Clean up allocated vector indices resources from cache.
final NativeMemoryCacheManager nativeMemoryCacheManager = NativeMemoryCacheManager.getInstance();
cacheKeys.forEach(nativeMemoryCacheManager::invalidate);

// Close a reader.
IOUtils.close(flatVectorsReader);

// Clean up quantized state cache.
if (quantizationStateCacheKeyPerField != null) {
final QuantizationStateCacheManager quantizationStateCacheManager = QuantizationStateCacheManager.getInstance();
for (String cacheKey : quantizationStateCacheKeyPerField.values()) {
QuantizationStateCacheManager.getInstance().evict(cacheKey);
quantizationStateCacheManager.evict(cacheKey);
}
}
}
Expand All @@ -192,11 +207,26 @@ public long ramBytesUsed() {
return flatVectorsReader.ramBytesUsed();
}

private void loadCacheKeyMap() throws IOException {
private void loadCacheKeyMap() {
quantizationStateCacheKeyPerField = new HashMap<>();
for (FieldInfo fieldInfo : segmentReadState.fieldInfos) {
String cacheKey = UUIDs.base64UUID();
quantizationStateCacheKeyPerField.put(fieldInfo.getName(), cacheKey);
}
}

private static List<String> getVectorCacheKeysFromSegmentReaderState(SegmentReadState segmentReadState) {
final List<String> cacheKeys = new ArrayList<>();

for (FieldInfo field : segmentReadState.fieldInfos) {
final String vectorIndexFileName = KNNCodecUtil.getNativeEngineFileFromFieldInfo(field, segmentReadState.segmentInfo);
if (vectorIndexFileName == null) {
continue;
}
final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(vectorIndexFileName, segmentReadState.segmentInfo);
cacheKeys.add(cacheKey);
}

return cacheKeys;
}
}
Loading
Loading