From 77e2e61eefcc27847593c266b470d2d8cb0a6a2e Mon Sep 17 00:00:00 2001 From: Doo Yong Kim <0ctopus13prime@gmail.com> Date: Mon, 21 Oct 2024 15:53:37 -0700 Subject: [PATCH] Remove FileWatcher from KNN (#2182) Signed-off-by: Dooyong Kim (cherry picked from commit e5599aa151cf43700d0ea327ad536cc106e5bb61) --- CHANGELOG.md | 1 + .../opensearch/knn/index/KNNIndexShard.java | 51 +++++---- .../KNN80Codec/KNN80DocValuesProducer.java | 105 +++++------------- .../NativeEngines990KnnVectorsReader.java | 40 ++++++- .../knn/index/codec/util/KNNCodecUtil.java | 45 ++++++++ .../util/NativeMemoryCacheKeyHelper.java | 45 ++++++++ .../index/memory/NativeMemoryAllocation.java | 56 ++++------ .../memory/NativeMemoryCacheManager.java | 4 +- .../memory/NativeMemoryEntryContext.java | 35 +++--- .../memory/NativeMemoryLoadStrategy.java | 65 ++++------- .../opensearch/knn/index/query/KNNWeight.java | 12 +- .../org/opensearch/knn/jni/JNIService.java | 1 - .../org/opensearch/knn/plugin/KNNPlugin.java | 1 - .../knn/index/KNNIndexShardTests.java | 46 ++++++-- .../KNN80DocValuesProducerTests.java | 10 +- .../knn/index/codec/KNNCodecTestCase.java | 9 -- .../memory/NativeMemoryAllocationTests.java | 46 ++------ .../memory/NativeMemoryCacheManagerTests.java | 27 ++--- .../memory/NativeMemoryEntryContextTests.java | 38 +++---- .../memory/NativeMemoryLoadStrategyTests.java | 16 +-- .../java/org/opensearch/knn/TestUtils.java | 5 + 21 files changed, 333 insertions(+), 325 deletions(-) create mode 100644 src/main/java/org/opensearch/knn/index/codec/util/NativeMemoryCacheKeyHelper.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 141d01c7dd..03d6d43a9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * Add support to build vector data structures greedily and perform exact search when there are no engine files [#1942](https://github.com/opensearch-project/k-NN/issues/1942) * Add CompressionLevel Calculation for PQ [#2200](https://github.com/opensearch-project/k-NN/pull/2200) * Introduce a loading layer in native engine [#2185](https://github.com/opensearch-project/k-NN/pull/2185) +* Remove FSDirectory dependency from native engine constructing side and deprecated FileWatcher [#2182](https://github.com/opensearch-project/k-NN/pull/2182) ### Bug Fixes * Add DocValuesProducers for releasing memory when close index [#1946](https://github.com/opensearch-project/k-NN/pull/1946) * KNN80DocValues should only be considered for BinaryDocValues fields [#2147](https://github.com/opensearch-project/k-NN/pull/2147) diff --git a/src/main/java/org/opensearch/knn/index/KNNIndexShard.java b/src/main/java/org/opensearch/knn/index/KNNIndexShard.java index a4d31c5464..ac4c055b01 100644 --- a/src/main/java/org/opensearch/knn/index/KNNIndexShard.java +++ b/src/main/java/org/opensearch/knn/index/KNNIndexShard.java @@ -12,14 +12,15 @@ import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentReader; import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; -import org.apache.lucene.store.FilterDirectory; import org.opensearch.common.lucene.Lucene; import org.opensearch.index.engine.Engine; import org.opensearch.index.shard.IndexShard; import org.opensearch.knn.common.FieldInfoExtractor; +import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper; import org.opensearch.knn.index.engine.qframe.QuantizationConfig; import org.opensearch.knn.index.mapper.KNNVectorFieldMapper; import org.opensearch.knn.index.memory.NativeMemoryAllocation; @@ -29,9 +30,7 @@ import org.opensearch.knn.index.engine.KNNEngine; import java.io.IOException; -import java.nio.file.Path; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -94,14 +93,18 @@ public void warmup() throws IOException { try (Engine.Searcher searcher = indexShard.acquireSearcher("knn-warmup")) { getAllEngineFileContexts(searcher.getIndexReader()).forEach((engineFileContext) -> { try { + final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey( + engineFileContext.vectorFileName, + engineFileContext.segmentInfo + ); nativeMemoryCacheManager.get( new NativeMemoryEntryContext.IndexEntryContext( directory, - engineFileContext.getIndexPath(), + cacheKey, NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance(), getParametersAtLoading( engineFileContext.getSpaceType(), - KNNEngine.getEngineNameFromPath(engineFileContext.getIndexPath()), + KNNEngine.getEngineNameFromPath(engineFileContext.getVectorFileName()), getIndexName(), engineFileContext.getVectorDataType() ), @@ -133,9 +136,13 @@ public void clearCache() { indexAllocation.writeLock(); log.info("[KNN] Evicting index from cache: [{}]", indexName); try (Engine.Searcher searcher = indexShard.acquireSearcher(INDEX_SHARD_CLEAR_CACHE_SEARCHER)) { - getAllEngineFileContexts(searcher.getIndexReader()).forEach( - (engineFileContext) -> nativeMemoryCacheManager.invalidate(engineFileContext.getIndexPath()) - ); + getAllEngineFileContexts(searcher.getIndexReader()).forEach((engineFileContext) -> { + final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey( + engineFileContext.vectorFileName, + engineFileContext.segmentInfo + ); + nativeMemoryCacheManager.invalidate(cacheKey); + }); } catch (IOException ex) { log.error("[KNN] Failed to evict index from cache: [{}]", indexName, ex); throw new RuntimeException(ex); @@ -166,7 +173,6 @@ List getEngineFileContexts(IndexReader indexReader, KNNEngine for (LeafReaderContext leafReaderContext : indexReader.leaves()) { SegmentReader reader = Lucene.segmentReader(leafReaderContext.reader()); - Path shardPath = ((FSDirectory) FilterDirectory.unwrap(reader.directory())).getDirectory(); String fileExtension = reader.getSegmentInfo().info.getUseCompoundFile() ? knnEngine.getCompoundExtension() : knnEngine.getExtension(); @@ -180,11 +186,9 @@ List getEngineFileContexts(IndexReader indexReader, KNNEngine String modelId = fieldInfo.attributes().getOrDefault(MODEL_ID, null); engineFiles.addAll( getEngineFileContexts( - reader.getSegmentInfo().files(), - reader.getSegmentInfo().info.name, + reader.getSegmentInfo(), fieldInfo.name, fileExtension, - shardPath, spaceType, modelId, FieldInfoExtractor.extractQuantizationConfig(fieldInfo) == QuantizationConfig.EMPTY @@ -202,22 +206,22 @@ List getEngineFileContexts(IndexReader indexReader, KNNEngine @VisibleForTesting List getEngineFileContexts( - Collection files, - String segmentName, + SegmentCommitInfo segmentCommitInfo, String fieldName, String fileExtension, - Path shardPath, SpaceType spaceType, String modelId, VectorDataType vectorDataType - ) { - String prefix = buildEngineFilePrefix(segmentName); - String suffix = buildEngineFileSuffix(fieldName, fileExtension); - return files.stream() + ) throws IOException { + // Ex: 0_ + final String prefix = buildEngineFilePrefix(segmentCommitInfo.info.name); + // Ex: _my_field.faiss + final String suffix = buildEngineFileSuffix(fieldName, fileExtension); + return segmentCommitInfo.files() + .stream() .filter(fileName -> fileName.startsWith(prefix)) .filter(fileName -> fileName.endsWith(suffix)) - .map(fileName -> shardPath.resolve(fileName).toString()) - .map(fileName -> new EngineFileContext(spaceType, modelId, fileName, vectorDataType)) + .map(vectorFileName -> new EngineFileContext(spaceType, modelId, vectorFileName, vectorDataType, segmentCommitInfo.info)) .collect(Collectors.toList()); } @@ -227,7 +231,8 @@ List getEngineFileContexts( static class EngineFileContext { private final SpaceType spaceType; private final String modelId; - private final String indexPath; + private final String vectorFileName; private final VectorDataType vectorDataType; + private final SegmentInfo segmentInfo; } } diff --git a/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesProducer.java b/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesProducer.java index b78566f2ed..23c9f31051 100644 --- a/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesProducer.java +++ b/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesProducer.java @@ -11,82 +11,34 @@ package org.opensearch.knn.index.codec.KNN80Codec; -import lombok.NonNull; import lombok.extern.log4j.Log4j2; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.DocValuesType; import org.apache.lucene.index.FieldInfo; + +import java.io.IOException; + import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SegmentReadState; import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.index.SortedSetDocValues; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; -import org.apache.lucene.store.FilterDirectory; -import org.opensearch.common.io.PathUtils; -import org.opensearch.knn.common.FieldInfoExtractor; import org.opensearch.knn.index.codec.util.KNNCodecUtil; -import org.opensearch.knn.index.engine.KNNEngine; +import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper; import org.opensearch.knn.index.memory.NativeMemoryCacheManager; -import java.io.IOException; -import java.nio.file.Path; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; - -import static org.opensearch.knn.common.KNNConstants.MODEL_ID; -import static org.opensearch.knn.index.mapper.KNNVectorFieldMapper.KNN_FIELD; @Log4j2 public class KNN80DocValuesProducer extends DocValuesProducer { - - private final SegmentReadState state; private final DocValuesProducer delegate; - private final NativeMemoryCacheManager nativeMemoryCacheManager; - private final Map indexPathMap = new HashMap(); + private List cacheKeys; public KNN80DocValuesProducer(DocValuesProducer delegate, SegmentReadState state) { this.delegate = delegate; - this.state = state; - this.nativeMemoryCacheManager = NativeMemoryCacheManager.getInstance(); - - Directory directory = state.directory; - // directory would be CompoundDirectory, we need get directory firstly and then unwrap - if (state.directory instanceof KNN80CompoundDirectory) { - directory = ((KNN80CompoundDirectory) state.directory).getDir(); - } - - Directory dir = FilterDirectory.unwrap(directory); - if (!(dir instanceof FSDirectory)) { - log.warn("{} can not casting to FSDirectory", directory); - return; - } - String directoryPath = ((FSDirectory) dir).getDirectory().toString(); - for (FieldInfo field : state.fieldInfos) { - if (!field.attributes().containsKey(KNN_FIELD)) { - continue; - } - // Only segments that contains BinaryDocValues and doesn't have vector values should be considered. - // By default, we don't create BinaryDocValues for knn field anymore. However, users can set doc_values = true - // to create binary doc values explicitly like any other field. Hence, we only want to include fields - // where approximate search is possible only by BinaryDocValues. - if (field.getDocValuesType() != DocValuesType.BINARY || field.hasVectorValues() == true) { - continue; - } - // Only Native Engine put into indexPathMap - KNNEngine knnEngine = getNativeKNNEngine(field); - if (knnEngine == null) { - continue; - } - List engineFiles = KNNCodecUtil.getEngineFiles(knnEngine.getExtension(), field.name, state.segmentInfo); - Path indexPath = PathUtils.get(directoryPath, engineFiles.get(0)); - indexPathMap.putIfAbsent(field.getName(), indexPath.toString()); - - } + this.cacheKeys = getVectorCacheKeysFromSegmentReaderState(state); } @Override @@ -121,32 +73,35 @@ public void checkIntegrity() throws IOException { @Override public void close() throws IOException { - for (String path : indexPathMap.values()) { - nativeMemoryCacheManager.invalidate(path); - } + final NativeMemoryCacheManager nativeMemoryCacheManager = NativeMemoryCacheManager.getInstance(); + cacheKeys.forEach(nativeMemoryCacheManager::invalidate); delegate.close(); } - public final List getOpenedIndexPath() { - return new ArrayList<>(indexPathMap.values()); + public final List getCacheKeys() { + return new ArrayList<>(cacheKeys); } - /** - * Get KNNEngine From FieldInfo - * - * @param field which field we need produce from engine - * @return if and only if Native Engine we return specific engine, else return null - */ - private KNNEngine getNativeKNNEngine(@NonNull FieldInfo field) { - - final String modelId = field.attributes().get(MODEL_ID); - if (modelId != null) { - return null; - } - KNNEngine engine = FieldInfoExtractor.extractKNNEngine(field); - if (KNNEngine.getEnginesThatCreateCustomSegmentFiles().contains(engine)) { - return engine; + private static List getVectorCacheKeysFromSegmentReaderState(SegmentReadState segmentReadState) { + final List cacheKeys = new ArrayList<>(); + + for (FieldInfo field : segmentReadState.fieldInfos) { + // Only segments that contains BinaryDocValues and doesn't have vector values should be considered. + // By default, we don't create BinaryDocValues for knn field anymore. However, users can set doc_values = true + // to create binary doc values explicitly like any other field. Hence, we only want to include fields + // where approximate search is possible only by BinaryDocValues. + if (field.getDocValuesType() != DocValuesType.BINARY || field.hasVectorValues()) { + continue; + } + + final String vectorIndexFileName = KNNCodecUtil.getNativeEngineFileFromFieldInfo(field, segmentReadState.segmentInfo); + if (vectorIndexFileName == null) { + continue; + } + final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(vectorIndexFileName, segmentReadState.segmentInfo); + cacheKeys.add(cacheKey); } - return null; + + return cacheKeys; } } diff --git a/src/main/java/org/opensearch/knn/index/codec/KNN990Codec/NativeEngines990KnnVectorsReader.java b/src/main/java/org/opensearch/knn/index/codec/KNN990Codec/NativeEngines990KnnVectorsReader.java index 16631fd97b..efabc3a70f 100644 --- a/src/main/java/org/opensearch/knn/index/codec/KNN990Codec/NativeEngines990KnnVectorsReader.java +++ b/src/main/java/org/opensearch/knn/index/codec/KNN990Codec/NativeEngines990KnnVectorsReader.java @@ -24,13 +24,18 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.IOUtils; import org.opensearch.common.UUIDs; +import org.opensearch.knn.index.codec.util.KNNCodecUtil; +import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper; +import org.opensearch.knn.index.memory.NativeMemoryCacheManager; import org.opensearch.knn.index.quantizationservice.QuantizationService; import org.opensearch.knn.quantization.models.quantizationState.QuantizationState; import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager; import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateReadConfig; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -40,12 +45,14 @@ public class NativeEngines990KnnVectorsReader extends KnnVectorsReader { private final FlatVectorsReader flatVectorsReader; - private final SegmentReadState segmentReadState; private Map quantizationStateCacheKeyPerField; + private SegmentReadState segmentReadState; + private final List cacheKeys; - public NativeEngines990KnnVectorsReader(final SegmentReadState state, final FlatVectorsReader flatVectorsReader) throws IOException { - this.segmentReadState = state; + public NativeEngines990KnnVectorsReader(final SegmentReadState state, final FlatVectorsReader flatVectorsReader) { this.flatVectorsReader = flatVectorsReader; + this.segmentReadState = state; + this.cacheKeys = getVectorCacheKeysFromSegmentReaderState(state); loadCacheKeyMap(); } @@ -176,10 +183,18 @@ public void search(String field, byte[] target, KnnCollector knnCollector, Bits */ @Override public void close() throws IOException { + // Clean up allocated vector indices resources from cache. + final NativeMemoryCacheManager nativeMemoryCacheManager = NativeMemoryCacheManager.getInstance(); + cacheKeys.forEach(nativeMemoryCacheManager::invalidate); + + // Close a reader. IOUtils.close(flatVectorsReader); + + // Clean up quantized state cache. if (quantizationStateCacheKeyPerField != null) { + final QuantizationStateCacheManager quantizationStateCacheManager = QuantizationStateCacheManager.getInstance(); for (String cacheKey : quantizationStateCacheKeyPerField.values()) { - QuantizationStateCacheManager.getInstance().evict(cacheKey); + quantizationStateCacheManager.evict(cacheKey); } } } @@ -192,11 +207,26 @@ public long ramBytesUsed() { return flatVectorsReader.ramBytesUsed(); } - private void loadCacheKeyMap() throws IOException { + private void loadCacheKeyMap() { quantizationStateCacheKeyPerField = new HashMap<>(); for (FieldInfo fieldInfo : segmentReadState.fieldInfos) { String cacheKey = UUIDs.base64UUID(); quantizationStateCacheKeyPerField.put(fieldInfo.getName(), cacheKey); } } + + private static List getVectorCacheKeysFromSegmentReaderState(SegmentReadState segmentReadState) { + final List cacheKeys = new ArrayList<>(); + + for (FieldInfo field : segmentReadState.fieldInfos) { + final String vectorIndexFileName = KNNCodecUtil.getNativeEngineFileFromFieldInfo(field, segmentReadState.segmentInfo); + if (vectorIndexFileName == null) { + continue; + } + final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(vectorIndexFileName, segmentReadState.segmentInfo); + cacheKeys.add(cacheKey); + } + + return cacheKeys; + } } diff --git a/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java b/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java index 84c7c46752..3ccfc3c2bd 100644 --- a/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java +++ b/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java @@ -5,8 +5,11 @@ package org.opensearch.knn.index.codec.util; +import lombok.NonNull; import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.SegmentInfo; +import org.opensearch.knn.common.FieldInfoExtractor; import org.opensearch.knn.common.KNNConstants; import org.opensearch.knn.index.VectorDataType; import org.opensearch.knn.index.codec.KNN80Codec.KNN80BinaryDocValues; @@ -16,6 +19,8 @@ import java.util.List; import java.util.stream.Collectors; +import static org.opensearch.knn.index.mapper.KNNVectorFieldMapper.KNN_FIELD; + public class KNNCodecUtil { // Floats are 4 bytes in size public static final int FLOAT_BYTE_SIZE = 4; @@ -84,4 +89,44 @@ public static List getEngineFiles(String extension, String fieldName, Se .collect(Collectors.toList()); return engineFiles; } + + /** + * Get engine file name from given field and segment info. + * Ex: _0_165_my_field.faiss + * + * @param field : Field info that might have a vector index file. Not always it has it. + * @param segmentInfo : Segment where we are collecting an engine file list. + * @return : Found vector engine names, if not found, returns null. + */ + public static String getNativeEngineFileFromFieldInfo(FieldInfo field, SegmentInfo segmentInfo) { + if (!field.attributes().containsKey(KNN_FIELD)) { + return null; + } + // Only Native Engine put into indexPathMap + final KNNEngine knnEngine = getNativeKNNEngine(field); + if (knnEngine == null) { + return null; + } + final List engineFiles = KNNCodecUtil.getEngineFiles(knnEngine.getExtension(), field.name, segmentInfo); + if (engineFiles.isEmpty()) { + return null; + } else { + final String vectorIndexFileName = engineFiles.get(0); + return vectorIndexFileName; + } + } + + /** + * Get KNNEngine From FieldInfo + * + * @param field which field we need produce from engine + * @return if and only if Native Engine we return specific engine, else return null + */ + private static KNNEngine getNativeKNNEngine(@NonNull FieldInfo field) { + final KNNEngine engine = FieldInfoExtractor.extractKNNEngine(field); + if (KNNEngine.getEnginesThatCreateCustomSegmentFiles().contains(engine)) { + return engine; + } + return null; + } } diff --git a/src/main/java/org/opensearch/knn/index/codec/util/NativeMemoryCacheKeyHelper.java b/src/main/java/org/opensearch/knn/index/codec/util/NativeMemoryCacheKeyHelper.java new file mode 100644 index 0000000000..8d50bf0298 --- /dev/null +++ b/src/main/java/org/opensearch/knn/index/codec/util/NativeMemoryCacheKeyHelper.java @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.knn.index.codec.util; + +import org.apache.lucene.index.SegmentInfo; + +import java.util.Base64; + +public final class NativeMemoryCacheKeyHelper { + private NativeMemoryCacheKeyHelper() {} + + /** + * Construct a unique cache key for look-up operation in {@link org.opensearch.knn.index.memory.NativeMemoryCacheManager} + * + * @param vectorIndexFileName Vector index file name. Ex: _0_165_test_field.faiss. + * @param segmentInfo Segment info object representing a logical segment unit containing a vector index. + * @return Unique cache key that can be used for look-up and invalidating in + * {@link org.opensearch.knn.index.memory.NativeMemoryCacheManager} + */ + public static String constructCacheKey(final String vectorIndexFileName, final SegmentInfo segmentInfo) { + final String segmentId = Base64.getEncoder().encodeToString(segmentInfo.getId()); + final String cacheKey = vectorIndexFileName + "@" + segmentId; + return cacheKey; + } + + /** + * From cacheKey, we extract a vector file name. + * Note that expected format of cacheKey consists of two part with '@' as a delimiter. + * First part would be the vector file name, the second one is the segment id. + * + * @param cacheKey : Cache key for {@link org.opensearch.knn.index.memory.NativeMemoryCacheManager} + * @return : Vector file name, if the given cacheKey was invalid format, returns null. + */ + public static String extractVectorIndexFileName(final String cacheKey) { + final int indexOfDelimiter = cacheKey.indexOf('@'); + if (indexOfDelimiter != -1) { + final String vectorFileName = cacheKey.substring(0, indexOfDelimiter); + return vectorFileName; + } + return null; + } +} diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryAllocation.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryAllocation.java index 360c827f95..9ac3caa239 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryAllocation.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryAllocation.java @@ -21,8 +21,6 @@ import org.opensearch.knn.index.query.KNNWeight; import org.opensearch.knn.jni.JNIService; import org.opensearch.knn.index.engine.KNNEngine; -import org.opensearch.watcher.FileWatcher; -import org.opensearch.watcher.WatcherHandle; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; @@ -115,11 +113,10 @@ class IndexAllocation implements NativeMemoryAllocation { @Getter private final KNNEngine knnEngine; @Getter - private final String indexPath; + private final String vectorFileName; @Getter private final String openSearchIndexName; private final ReadWriteLock readWriteLock; - private final WatcherHandle watcherHandle; private final SharedIndexState sharedIndexState; @Getter private final boolean isBinaryIndex; @@ -132,20 +129,18 @@ class IndexAllocation implements NativeMemoryAllocation { * @param memoryAddress Pointer in memory to the index * @param sizeKb Size this index consumes in kilobytes * @param knnEngine KNNEngine associated with the index allocation - * @param indexPath File path to index + * @param vectorFileName Vector file name. Ex: _0_165_my_field.faiss * @param openSearchIndexName Name of OpenSearch index this index is associated with - * @param watcherHandle Handle for watching index file */ IndexAllocation( ExecutorService executorService, long memoryAddress, int sizeKb, KNNEngine knnEngine, - String indexPath, - String openSearchIndexName, - WatcherHandle watcherHandle + String vectorFileName, + String openSearchIndexName ) { - this(executorService, memoryAddress, sizeKb, knnEngine, indexPath, openSearchIndexName, watcherHandle, null, false); + this(executorService, memoryAddress, sizeKb, knnEngine, vectorFileName, openSearchIndexName, null, false); } /** @@ -155,9 +150,8 @@ class IndexAllocation implements NativeMemoryAllocation { * @param memoryAddress Pointer in memory to the index * @param sizeKb Size this index consumes in kilobytes * @param knnEngine KNNEngine associated with the index allocation - * @param indexPath File path to index + * @param vectorFileName Vector file name. Ex: _0_165_my_field.faiss * @param openSearchIndexName Name of OpenSearch index this index is associated with - * @param watcherHandle Handle for watching index file * @param sharedIndexState Shared index state. If not shared state present, pass null. */ IndexAllocation( @@ -165,21 +159,19 @@ class IndexAllocation implements NativeMemoryAllocation { long memoryAddress, int sizeKb, KNNEngine knnEngine, - String indexPath, + String vectorFileName, String openSearchIndexName, - WatcherHandle watcherHandle, SharedIndexState sharedIndexState, boolean isBinaryIndex ) { this.executor = executorService; this.closed = false; this.knnEngine = knnEngine; - this.indexPath = indexPath; + this.vectorFileName = vectorFileName; this.openSearchIndexName = openSearchIndexName; this.memoryAddress = memoryAddress; this.readWriteLock = new ReentrantReadWriteLock(); this.sizeKb = sizeKb; - this.watcherHandle = watcherHandle; this.sharedIndexState = sharedIndexState; this.isBinaryIndex = isBinaryIndex; this.refCounted = new RefCountedReleasable<>("IndexAllocation-Reference", this, this::closeInternal); @@ -218,8 +210,6 @@ private void cleanup() { this.closed = true; - watcherHandle.stop(); - // memoryAddress is sometimes initialized to 0. If this is ever the case, freeing will surely fail. if (memoryAddress != 0) { JNIService.free(memoryAddress, knnEngine, isBinaryIndex); @@ -294,30 +284,31 @@ class TrainingDataAllocation implements NativeMemoryAllocation { private final ExecutorService executor; private volatile boolean closed; + @Setter private long memoryAddress; - private final int size; + private final int sizeKb; @Getter @Setter private QuantizationConfig quantizationConfig = QuantizationConfig.EMPTY; // Implement reader/writer with semaphores to deal with passing lock conditions between threads private int readCount; - private Semaphore readSemaphore; - private Semaphore writeSemaphore; - private VectorDataType vectorDataType; + private final Semaphore readSemaphore; + private final Semaphore writeSemaphore; + private final VectorDataType vectorDataType; /** * Constructor * * @param executor Executor used for allocation close * @param memoryAddress pointer in memory to the training data allocation - * @param size amount memory needed for allocation in kilobytes + * @param sizeKb amount memory needed for allocation in kilobytes */ - public TrainingDataAllocation(ExecutorService executor, long memoryAddress, int size, VectorDataType vectorDataType) { + public TrainingDataAllocation(ExecutorService executor, long memoryAddress, int sizeKb, VectorDataType vectorDataType) { this.executor = executor; this.closed = false; this.memoryAddress = memoryAddress; - this.size = size; + this.sizeKb = sizeKb; this.readCount = 0; this.readSemaphore = new Semaphore(1); @@ -401,7 +392,7 @@ public void readLock() { /** * A write lock will be obtained either on eviction from {@link NativeMemoryCacheManager NativeMemoryManager's} * or when training data is actually being loaded. A semaphore is used because collecting training data - * happens asynchrously, so the thread that obtains the lock will not be the same thread that releases the + * happens asynchronously, so the thread that obtains the lock will not be the same thread that releases the * lock. */ @Override @@ -438,23 +429,14 @@ public void writeUnlock() { @Override public int getSizeInKB() { - return size; - } - - /** - * Setter for memory address to training data - * - * @param memoryAddress Pointer to training data - */ - public void setMemoryAddress(long memoryAddress) { - this.memoryAddress = memoryAddress; + return sizeKb; } } /** * An anonymous allocation is used to reserve space in the native memory cache. It does not have a * memory address. This allocation type should be used when a function allocates a large portion of memory in the - * function, runs for awhile, and then frees it. + * function, runs for a while, and then frees it. */ class AnonymousAllocation implements NativeMemoryAllocation { diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java index 649fb9774e..b8aecc5a57 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java @@ -291,8 +291,8 @@ public CacheStats getCacheStats() { public NativeMemoryAllocation get(NativeMemoryEntryContext nativeMemoryEntryContext, boolean isAbleToTriggerEviction) throws ExecutionException { if (!isAbleToTriggerEviction - && !cache.asMap().containsKey(nativeMemoryEntryContext.getKey()) - && maxWeight - getCacheSizeInKilobytes() - nativeMemoryEntryContext.calculateSizeInKB() <= 0) { + && (maxWeight - getCacheSizeInKilobytes() - nativeMemoryEntryContext.calculateSizeInKB()) <= 0 + && !cache.asMap().containsKey(nativeMemoryEntryContext.getKey())) { throw new OutOfNativeMemoryException( "Entry cannot be loaded into cache because it would not fit. " + "Entry size: " diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryEntryContext.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryEntryContext.java index 00bf023f90..0af13fb46a 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryEntryContext.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryEntryContext.java @@ -15,14 +15,13 @@ import org.apache.lucene.store.Directory; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; +import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper; import org.opensearch.knn.index.engine.qframe.QuantizationConfig; -import org.opensearch.knn.index.util.IndexUtil; import org.opensearch.knn.index.VectorDataType; import java.io.IOException; import java.util.Map; import java.util.UUID; -import java.util.function.Function; /** * Encapsulates all information needed to load a component into native memory. @@ -80,26 +79,26 @@ public static class IndexEntryContext extends NativeMemoryEntryContext parameters, String openSearchIndexName ) { - this(directory, indexPath, indexLoadStrategy, parameters, openSearchIndexName, null); + this(directory, vectorIndexCacheKey, indexLoadStrategy, parameters, openSearchIndexName, null); } /** * Constructor * * @param directory Lucene directory to create required IndexInput/IndexOutput to access files. - * @param indexPath path to index file. Also used as key in cache. + * @param vectorIndexCacheKey Cache key for {@link NativeMemoryCacheManager}. It must contain a vector file name. * @param indexLoadStrategy strategy to load index into memory * @param parameters load time parameters * @param openSearchIndexName opensearch index associated with index @@ -107,13 +106,13 @@ public IndexEntryContext( */ public IndexEntryContext( Directory directory, - String indexPath, + String vectorIndexCacheKey, NativeMemoryLoadStrategy.IndexLoadStrategy indexLoadStrategy, Map parameters, String openSearchIndexName, String modelId ) { - super(indexPath); + super(vectorIndexCacheKey); this.directory = directory; this.indexLoadStrategy = indexLoadStrategy; this.openSearchIndexName = openSearchIndexName; @@ -123,25 +122,19 @@ public IndexEntryContext( @Override public Integer calculateSizeInKB() { - return IndexSizeCalculator.INSTANCE.apply(this); + final String indexFileName = NativeMemoryCacheKeyHelper.extractVectorIndexFileName(key); + try { + final long fileLength = directory.fileLength(indexFileName); + return (int) (fileLength / 1024L); + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override public NativeMemoryAllocation.IndexAllocation load() throws IOException { return indexLoadStrategy.load(this); } - - private static class IndexSizeCalculator implements Function { - - static IndexSizeCalculator INSTANCE = new IndexSizeCalculator(); - - IndexSizeCalculator() {} - - @Override - public Integer apply(IndexEntryContext indexEntryContext) { - return IndexUtil.getFileSizeInKB(indexEntryContext.getKey()); - } - } } public static class TrainingDataEntryContext extends NativeMemoryEntryContext { diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategy.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategy.java index 5daa1e047a..8cbdb4fd71 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategy.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategy.java @@ -16,6 +16,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.opensearch.core.action.ActionListener; +import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper; import org.opensearch.knn.index.engine.qframe.QuantizationConfig; import org.opensearch.knn.index.store.IndexInputWithBuffer; import org.opensearch.knn.index.util.IndexUtil; @@ -23,15 +24,9 @@ import org.opensearch.knn.index.engine.KNNEngine; import org.opensearch.knn.training.TrainingDataConsumer; import org.opensearch.knn.training.VectorReader; -import org.opensearch.watcher.FileChangesListener; -import org.opensearch.watcher.FileWatcher; -import org.opensearch.watcher.ResourceWatcherService; -import org.opensearch.watcher.WatcherHandle; import java.io.Closeable; import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -57,8 +52,6 @@ class IndexLoadStrategy private static IndexLoadStrategy INSTANCE; private final ExecutorService executor; - private final FileChangesListener indexFileOnDeleteListener; - private ResourceWatcherService resourceWatcherService; /** * Get Singleton of this load strategy. @@ -72,47 +65,34 @@ public static synchronized IndexLoadStrategy getInstance() { return INSTANCE; } - /** - * Initialize singleton. - * - * @param resourceWatcherService service used to monitor index files for deletion - */ - public static void initialize(final ResourceWatcherService resourceWatcherService) { - getInstance().resourceWatcherService = resourceWatcherService; - } - private IndexLoadStrategy() { executor = Executors.newSingleThreadExecutor(); - indexFileOnDeleteListener = new FileChangesListener() { - @Override - public void onFileDeleted(Path indexFilePath) { - NativeMemoryCacheManager.getInstance().invalidate(indexFilePath.toString()); - } - }; } @Override public NativeMemoryAllocation.IndexAllocation load(NativeMemoryEntryContext.IndexEntryContext indexEntryContext) throws IOException { - final Path absoluteIndexPath = Paths.get(indexEntryContext.getKey()); - final KNNEngine knnEngine = KNNEngine.getEngineNameFromPath(absoluteIndexPath.toString()); - final FileWatcher fileWatcher = new FileWatcher(absoluteIndexPath); - fileWatcher.addListener(indexFileOnDeleteListener); - fileWatcher.init(); + // Extract vector file name from the given cache key. + // Ex: _0_165_my_field.faiss@1vaqiupVUwvkXAG4Qc/RPg== + final String cacheKey = indexEntryContext.getKey(); + final String vectorFileName = NativeMemoryCacheKeyHelper.extractVectorIndexFileName(cacheKey); + if (vectorFileName == null) { + throw new IllegalStateException( + "Invalid cache key was given. The key [" + cacheKey + "] does not contain the corresponding vector file name." + ); + } + // Prepare for opening index input from directory. + final KNNEngine knnEngine = KNNEngine.getEngineNameFromPath(vectorFileName); final Directory directory = indexEntryContext.getDirectory(); + final int indexSizeKb = Math.toIntExact(directory.fileLength(vectorFileName) / 1024); - // Ex: Input -> /a/b/c/_0_NativeEngines990KnnVectorsFormat_0.vec - // Output -> _0_NativeEngines990KnnVectorsFormat_0.vec - final String logicalIndexPath = absoluteIndexPath.getFileName().toString(); - - final int indexSizeKb = Math.toIntExact(directory.fileLength(logicalIndexPath) / 1024); + // Try to open an index input then pass it down to native engine for loading an index. + try (IndexInput readStream = directory.openInput(vectorFileName, IOContext.READONCE)) { + final IndexInputWithBuffer indexInputWithBuffer = new IndexInputWithBuffer(readStream); + final long indexAddress = JNIService.loadIndex(indexInputWithBuffer, indexEntryContext.getParameters(), knnEngine); - try (IndexInput readStream = directory.openInput(logicalIndexPath, IOContext.READONCE)) { - IndexInputWithBuffer indexInputWithBuffer = new IndexInputWithBuffer(readStream); - long indexAddress = JNIService.loadIndex(indexInputWithBuffer, indexEntryContext.getParameters(), knnEngine); - - return createIndexAllocation(indexEntryContext, knnEngine, indexAddress, fileWatcher, indexSizeKb, absoluteIndexPath); + return createIndexAllocation(indexEntryContext, knnEngine, indexAddress, indexSizeKb, vectorFileName); } } @@ -120,10 +100,9 @@ private NativeMemoryAllocation.IndexAllocation createIndexAllocation( final NativeMemoryEntryContext.IndexEntryContext indexEntryContext, final KNNEngine knnEngine, final long indexAddress, - final FileWatcher fileWatcher, final int indexSizeKb, - final Path absoluteIndexPath - ) throws IOException { + final String vectorFileName + ) { SharedIndexState sharedIndexState = null; String modelId = indexEntryContext.getModelId(); if (IndexUtil.isSharedIndexStateRequired(knnEngine, modelId, indexAddress)) { @@ -132,15 +111,13 @@ private NativeMemoryAllocation.IndexAllocation createIndexAllocation( JNIService.setSharedIndexState(indexAddress, sharedIndexState.getSharedIndexStateAddress(), knnEngine); } - final WatcherHandle watcherHandle = resourceWatcherService.add(fileWatcher); return new NativeMemoryAllocation.IndexAllocation( executor, indexAddress, indexSizeKb, knnEngine, - absoluteIndexPath.toString(), + vectorFileName, indexEntryContext.getOpenSearchIndexName(), - watcherHandle, sharedIndexState, IndexUtil.isBinaryIndex(knnEngine, indexEntryContext.getParameters()) ); diff --git a/src/main/java/org/opensearch/knn/index/query/KNNWeight.java b/src/main/java/org/opensearch/knn/index/query/KNNWeight.java index 87c99b884c..b5b2a5d22f 100644 --- a/src/main/java/org/opensearch/knn/index/query/KNNWeight.java +++ b/src/main/java/org/opensearch/knn/index/query/KNNWeight.java @@ -15,13 +15,10 @@ import org.apache.lucene.search.FilteredDocIdSetIterator; import org.apache.lucene.search.Scorer; import org.apache.lucene.search.Weight; -import org.apache.lucene.store.FSDirectory; -import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.util.BitSet; import org.apache.lucene.util.BitSetIterator; import org.apache.lucene.util.Bits; import org.apache.lucene.util.FixedBitSet; -import org.opensearch.common.io.PathUtils; import org.opensearch.common.lucene.Lucene; import org.opensearch.knn.common.FieldInfoExtractor; import org.opensearch.knn.common.KNNConstants; @@ -29,6 +26,7 @@ import org.opensearch.knn.index.SpaceType; import org.opensearch.knn.index.VectorDataType; import org.opensearch.knn.index.codec.util.KNNCodecUtil; +import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper; import org.opensearch.knn.index.memory.NativeMemoryAllocation; import org.opensearch.knn.index.memory.NativeMemoryCacheManager; import org.opensearch.knn.index.memory.NativeMemoryEntryContext; @@ -43,7 +41,6 @@ import org.opensearch.knn.plugin.stats.KNNCounter; import java.io.IOException; -import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -229,7 +226,6 @@ private Map doANNSearch( final int k ) throws IOException { final SegmentReader reader = Lucene.segmentReader(context.reader()); - String directory = ((FSDirectory) FilterDirectory.unwrap(reader.directory())).getDirectory().toString(); FieldInfo fieldInfo = reader.getFieldInfos().fieldInfo(knnQuery.getField()); @@ -278,7 +274,9 @@ private Map doANNSearch( return Collections.emptyMap(); } - Path indexPath = PathUtils.get(directory, engineFiles.get(0)); + final String vectorIndexFileName = engineFiles.get(0); + final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(vectorIndexFileName, reader.getSegmentInfo().info); + final KNNQueryResult[] results; KNNCounter.GRAPH_QUERY_REQUESTS.increment(); @@ -288,7 +286,7 @@ private Map doANNSearch( indexAllocation = nativeMemoryCacheManager.get( new NativeMemoryEntryContext.IndexEntryContext( reader.directory(), - indexPath.toString(), + cacheKey, NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance(), getParametersAtLoading( spaceType, diff --git a/src/main/java/org/opensearch/knn/jni/JNIService.java b/src/main/java/org/opensearch/knn/jni/JNIService.java index a0daf65a76..dd4dcef17c 100644 --- a/src/main/java/org/opensearch/knn/jni/JNIService.java +++ b/src/main/java/org/opensearch/knn/jni/JNIService.java @@ -135,7 +135,6 @@ public static void createIndex( Map parameters, KNNEngine knnEngine ) { - if (KNNEngine.NMSLIB == knnEngine) { NmslibService.createIndex(ids, vectorsAddress, dim, indexPath, parameters); return; diff --git a/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java b/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java index cae8960bb6..d05bbaa05d 100644 --- a/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java +++ b/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java @@ -195,7 +195,6 @@ public Collection createComponents( this.clusterService = clusterService; // Initialize Native Memory loading strategies - NativeMemoryLoadStrategy.IndexLoadStrategy.initialize(resourceWatcherService); VectorReader vectorReader = new VectorReader(client); NativeMemoryLoadStrategy.TrainingLoadStrategy.initialize(vectorReader); diff --git a/src/test/java/org/opensearch/knn/index/KNNIndexShardTests.java b/src/test/java/org/opensearch/knn/index/KNNIndexShardTests.java index aaeee31e1e..18b9656e54 100644 --- a/src/test/java/org/opensearch/knn/index/KNNIndexShardTests.java +++ b/src/test/java/org/opensearch/knn/index/KNNIndexShardTests.java @@ -8,6 +8,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import lombok.SneakyThrows; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentInfo; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.StringHelper; +import org.apache.lucene.util.Version; +import org.mockito.Mockito; import org.opensearch.knn.KNNSingleNodeTestCase; import org.opensearch.index.IndexService; import org.opensearch.index.engine.Engine; @@ -15,8 +21,9 @@ import org.opensearch.knn.index.memory.NativeMemoryCacheManager; import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -113,11 +120,14 @@ public void testGetAllEngineFileContexts() throws IOException, ExecutionExceptio searcher = indexShard.acquireSearcher("test-hnsw-paths-2"); engineFileContexts = knnIndexShard.getAllEngineFileContexts(searcher.getIndexReader()); assertEquals(1, engineFileContexts.size()); - List paths = engineFileContexts.stream().map(KNNIndexShard.EngineFileContext::getIndexPath).collect(Collectors.toList()); + List paths = engineFileContexts.stream() + .map(KNNIndexShard.EngineFileContext::getVectorFileName) + .collect(Collectors.toList()); assertTrue(paths.get(0).contains("hnsw") || paths.get(0).contains("hnswc")); searcher.close(); } + @SneakyThrows public void testGetEngineFileContexts() { // Check that the correct engine paths are being returned by the KNNIndexShard String segmentName = "_0"; @@ -143,20 +153,40 @@ public void testGetEngineFileContexts() { KNNIndexShard knnIndexShard = new KNNIndexShard(null); - Path path = Paths.get(""); - List included = knnIndexShard.getEngineFileContexts( - files, + final Directory dummyDirectory = Mockito.mock(Directory.class); + final SegmentInfo segmentInfo = new SegmentInfo( + dummyDirectory, + Version.LATEST, + null, segmentName, + 0, + false, + false, + null, + Collections.emptyMap(), + new byte[StringHelper.ID_LENGTH], + Collections.emptyMap(), + null + ); + // Inject 'files' into the segment info instance. + // Since SegmentInfo class does trim out its given file list, for example removing segment name from a file name etc, + // we can't just use 'setFiles' api to assign the file list. Which will lead this unit test to be fail. + final Field setFilesPrivateField = SegmentInfo.class.getDeclaredField("setFiles"); + setFilesPrivateField.setAccessible(true); + setFilesPrivateField.set(segmentInfo, new HashSet<>(files)); + + final SegmentCommitInfo segmentCommitInfo = new SegmentCommitInfo(segmentInfo, 0, 0, -1, 0, 0, null); + List included = knnIndexShard.getEngineFileContexts( + segmentCommitInfo, fieldName, fileExt, - path, spaceType, modelId, vectorDataType ); assertEquals(includedFileNames.size(), included.size()); - included.stream().map(KNNIndexShard.EngineFileContext::getIndexPath).forEach(o -> assertTrue(includedFileNames.contains(o))); + included.stream().map(KNNIndexShard.EngineFileContext::getVectorFileName).forEach(o -> assertTrue(includedFileNames.contains(o))); } @SneakyThrows diff --git a/src/test/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesProducerTests.java b/src/test/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesProducerTests.java index ccceee62b2..6b18c51c30 100644 --- a/src/test/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesProducerTests.java +++ b/src/test/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesProducerTests.java @@ -121,11 +121,11 @@ public void testProduceKNNBinaryField_fromCodec_nmslibCurrent() throws IOExcepti assertTrue(docValuesFormat instanceof KNN80DocValuesFormat); DocValuesProducer producer = docValuesFormat.fieldsProducer(state); assertTrue(producer instanceof KNN80DocValuesProducer); - int pathSize = ((KNN80DocValuesProducer) producer).getOpenedIndexPath().size(); - assertEquals(pathSize, 1); + int cacheKeySize = ((KNN80DocValuesProducer) producer).getCacheKeys().size(); + assertEquals(cacheKeySize, 1); - String path = ((KNN80DocValuesProducer) producer).getOpenedIndexPath().get(0); - assertTrue(path.contains(segmentFiles.get(0))); + String cacheKey = ((KNN80DocValuesProducer) producer).getCacheKeys().get(0); + assertTrue(cacheKey.contains(segmentFiles.get(0))); } public void testProduceKNNBinaryField_whenFieldHasNonBinaryDocValues_thenSkipThoseField() throws IOException { @@ -178,7 +178,7 @@ public void testProduceKNNBinaryField_whenFieldHasNonBinaryDocValues_thenSkipTho assertTrue(docValuesFormat instanceof KNN80DocValuesFormat); DocValuesProducer producer = docValuesFormat.fieldsProducer(state); assertTrue(producer instanceof KNN80DocValuesProducer); - assertEquals(0, ((KNN80DocValuesProducer) producer).getOpenedIndexPath().size()); + assertEquals(0, ((KNN80DocValuesProducer) producer).getCacheKeys().size()); } } diff --git a/src/test/java/org/opensearch/knn/index/codec/KNNCodecTestCase.java b/src/test/java/org/opensearch/knn/index/codec/KNNCodecTestCase.java index 3d9969a1e2..e6fcb643d9 100644 --- a/src/test/java/org/opensearch/knn/index/codec/KNNCodecTestCase.java +++ b/src/test/java/org/opensearch/knn/index/codec/KNNCodecTestCase.java @@ -185,8 +185,6 @@ public void testMultiFieldsKnnIndex(Codec codec) throws Exception { writer.flush(); IndexReader reader = writer.getReader(); writer.close(); - ResourceWatcherService resourceWatcherService = createDisabledResourceWatcherService(); - NativeMemoryLoadStrategy.IndexLoadStrategy.initialize(resourceWatcherService); List hnswfiles = Arrays.stream(dir.listAll()).filter(x -> x.contains("hnsw")).collect(Collectors.toList()); // there should be 2 hnsw index files created. one for test_vector and one for my_vector @@ -213,7 +211,6 @@ public void testMultiFieldsKnnIndex(Codec codec) throws Exception { reader.close(); dir.close(); - resourceWatcherService.close(); NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance().close(); } @@ -298,8 +295,6 @@ public void testBuildFromModelTemplate(Codec codec) throws IOException, Executio // Make sure that search returns the correct results KNNWeight.initialize(modelDao); - ResourceWatcherService resourceWatcherService = createDisabledResourceWatcherService(); - NativeMemoryLoadStrategy.IndexLoadStrategy.initialize(resourceWatcherService); float[] query = { 10.0f, 10.0f, 10.0f }; IndexSearcher searcher = new IndexSearcher(reader); TopDocs topDocs = searcher.search(new KNNQuery(fieldName, query, 4, "dummy", (BitSetProducer) null), 10); @@ -311,7 +306,6 @@ public void testBuildFromModelTemplate(Codec codec) throws IOException, Executio reader.close(); dir.close(); - resourceWatcherService.close(); NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance().close(); } } @@ -422,8 +416,6 @@ public void testKnnVectorIndex( writer.addDocument(doc1); IndexReader reader1 = writer.getReader(); writer.close(); - ResourceWatcherService resourceWatcherService = createDisabledResourceWatcherService(); - NativeMemoryLoadStrategy.IndexLoadStrategy.initialize(resourceWatcherService); verify(perFieldKnnVectorsFormatSpy, atLeastOnce()).getKnnVectorsFormatForField(eq(FIELD_NAME_TWO)); verify(perFieldKnnVectorsFormatSpy, atLeastOnce()).getMaxDimensions(eq(FIELD_NAME_TWO)); @@ -444,7 +436,6 @@ public void testKnnVectorIndex( reader1.close(); dir.close(); - resourceWatcherService.close(); NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance().close(); } } diff --git a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryAllocationTests.java b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryAllocationTests.java index 906ff4cb7d..db6231adff 100644 --- a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryAllocationTests.java +++ b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryAllocationTests.java @@ -26,8 +26,6 @@ import org.opensearch.knn.index.util.IndexUtil; import org.opensearch.knn.jni.JNICommons; import org.opensearch.knn.jni.JNIService; -import org.opensearch.watcher.FileWatcher; -import org.opensearch.watcher.WatcherHandle; import java.nio.file.Path; import java.util.Arrays; @@ -40,7 +38,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.opensearch.knn.common.featureflags.KNNFeatureFlags.KNN_FORCE_EVICT_CACHE_ENABLED_SETTING; @@ -86,10 +83,6 @@ public void testIndexAllocation_close() throws InterruptedException { // Load index into memory long memoryAddress = JNIService.loadIndex(path, parameters, knnEngine); - @SuppressWarnings("unchecked") - WatcherHandle watcherHandle = (WatcherHandle) mock(WatcherHandle.class); - doNothing().when(watcherHandle).stop(); - ExecutorService executorService = Executors.newSingleThreadExecutor(); NativeMemoryAllocation.IndexAllocation indexAllocation = new NativeMemoryAllocation.IndexAllocation( executorService, @@ -97,8 +90,7 @@ public void testIndexAllocation_close() throws InterruptedException { IndexUtil.getFileSizeInKB(path), knnEngine, path, - "test", - watcherHandle + "test" ); indexAllocation.close(); @@ -147,10 +139,6 @@ public void testClose_whenBinaryFiass_thenSuccess() { // Load index into memory long memoryAddress = JNIService.loadIndex(path, parameters, knnEngine); - @SuppressWarnings("unchecked") - WatcherHandle watcherHandle = (WatcherHandle) mock(WatcherHandle.class); - doNothing().when(watcherHandle).stop(); - ExecutorService executorService = Executors.newSingleThreadExecutor(); NativeMemoryAllocation.IndexAllocation indexAllocation = new NativeMemoryAllocation.IndexAllocation( executorService, @@ -159,7 +147,6 @@ public void testClose_whenBinaryFiass_thenSuccess() { knnEngine, path, "test", - watcherHandle, null, true ); @@ -189,8 +176,7 @@ public void testIndexAllocation_getMemoryAddress() { 0, null, "test", - "test", - null + "test" ); assertEquals(memoryAddress, indexAllocation.getMemoryAddress()); @@ -205,8 +191,7 @@ public void testIndexAllocation_readLock() throws InterruptedException { 0, null, "test", - "test", - null + "test" ); int initialValue = 10; @@ -232,7 +217,6 @@ public void testIndexAllocation_readLock() throws InterruptedException { } public void testIndexAllocation_closeDefault() { - WatcherHandle watcherHandle = (WatcherHandle) mock(WatcherHandle.class); ExecutorService executorService = Executors.newFixedThreadPool(2); AtomicReference expectedException = new AtomicReference<>(); @@ -243,8 +227,7 @@ public void testIndexAllocation_closeDefault() { 0, null, "test", - "test", - watcherHandle + "test" ); executorService.submit(nonBlockingIndexAllocation::readLock); @@ -261,7 +244,6 @@ public void testIndexAllocation_closeDefault() { public void testIndexAllocation_closeBlocking() throws InterruptedException, ExecutionException { // Prepare mocking and a thread pool. - WatcherHandle watcherHandle = (WatcherHandle) mock(WatcherHandle.class); ExecutorService executorService = Executors.newSingleThreadExecutor(); // Enable `KNN_FORCE_EVICT_CACHE_ENABLED_SETTING` to force it to block other threads. @@ -273,8 +255,7 @@ public void testIndexAllocation_closeBlocking() throws InterruptedException, Exe 0, null, "test", - "test", - watcherHandle + "test" ); // Acquire a read lock @@ -309,8 +290,7 @@ public void testIndexAllocation_writeLock() throws InterruptedException { 0, null, "test", - "test", - null + "test" ); int initialValue = 10; @@ -342,8 +322,7 @@ public void testIndexAllocation_getSize() { size, null, "test", - "test", - null + "test" ); assertEquals(size, indexAllocation.getSizeInKB()); @@ -357,8 +336,7 @@ public void testIndexAllocation_getKnnEngine() { 0, knnEngine, "test", - "test", - null + "test" ); assertEquals(knnEngine, indexAllocation.getKnnEngine()); @@ -372,11 +350,10 @@ public void testIndexAllocation_getIndexPath() { 0, null, indexPath, - "test", - null + "test" ); - assertEquals(indexPath, indexAllocation.getIndexPath()); + assertEquals(indexPath, indexAllocation.getVectorFileName()); } public void testIndexAllocation_getOsIndexName() { @@ -387,8 +364,7 @@ public void testIndexAllocation_getOsIndexName() { 0, null, "test", - osIndexName, - null + osIndexName ); assertEquals(osIndexName, indexAllocation.getOpenSearchIndexName()); diff --git a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java index 85eaf3322b..4baf66cb4a 100644 --- a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java +++ b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java @@ -118,8 +118,7 @@ public void testGetIndexSizeInKilobytes() throws ExecutionException, IOException indexEntryWeight, null, key, - indexName, - null + indexName ); NativeMemoryEntryContext.IndexEntryContext indexEntryContext = mock(NativeMemoryEntryContext.IndexEntryContext.class); @@ -152,8 +151,7 @@ public void testGetIndexSizeAsPercentage() throws ExecutionException, IOExceptio indexEntryWeight, null, key, - indexName, - null + indexName ); NativeMemoryEntryContext.IndexEntryContext indexEntryContext = mock(NativeMemoryEntryContext.IndexEntryContext.class); @@ -231,8 +229,7 @@ public void testGetIndexGraphCount() throws ExecutionException, IOException { indexEntryWeight, null, key1, - indexName1, - null + indexName1 ); NativeMemoryEntryContext.IndexEntryContext indexEntryContext = mock(NativeMemoryEntryContext.IndexEntryContext.class); @@ -247,8 +244,7 @@ public void testGetIndexGraphCount() throws ExecutionException, IOException { indexEntryWeight, null, key2, - indexName1, - null + indexName1 ); indexEntryContext = mock(NativeMemoryEntryContext.IndexEntryContext.class); @@ -263,8 +259,7 @@ public void testGetIndexGraphCount() throws ExecutionException, IOException { indexEntryWeight, null, key3, - indexName2, - null + indexName2 ); indexEntryContext = mock(NativeMemoryEntryContext.IndexEntryContext.class); @@ -408,8 +403,7 @@ public void testGetIndicesCacheStats() throws IOException, ExecutionException { size1, null, testKey1, - indexName1, - null + indexName1 ); NativeMemoryAllocation.IndexAllocation indexAllocation2 = new NativeMemoryAllocation.IndexAllocation( @@ -418,8 +412,7 @@ public void testGetIndicesCacheStats() throws IOException, ExecutionException { size2, null, testKey2, - indexName1, - null + indexName1 ); NativeMemoryAllocation.IndexAllocation indexAllocation3 = new NativeMemoryAllocation.IndexAllocation( @@ -428,8 +421,7 @@ public void testGetIndicesCacheStats() throws IOException, ExecutionException { size1, null, testKey3, - indexName2, - null + indexName2 ); NativeMemoryAllocation.IndexAllocation indexAllocation4 = new NativeMemoryAllocation.IndexAllocation( @@ -438,8 +430,7 @@ public void testGetIndicesCacheStats() throws IOException, ExecutionException { size2, null, testKey4, - indexName2, - null + indexName2 ); NativeMemoryEntryContext.IndexEntryContext indexEntryContext1 = mock(NativeMemoryEntryContext.IndexEntryContext.class); diff --git a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryEntryContextTests.java b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryEntryContextTests.java index 72cab9a1bd..5379abc74d 100644 --- a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryEntryContextTests.java +++ b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryEntryContextTests.java @@ -13,23 +13,21 @@ import com.google.common.collect.ImmutableMap; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.MMapDirectory; import org.opensearch.cluster.service.ClusterService; import org.opensearch.knn.KNNTestCase; +import org.opensearch.knn.TestUtils; import org.opensearch.knn.index.engine.qframe.QuantizationConfig; -import org.opensearch.knn.index.util.IndexUtil; import org.opensearch.knn.index.VectorDataType; import org.opensearch.knn.index.engine.KNNEngine; -import java.io.BufferedOutputStream; import java.io.IOException; -import java.io.OutputStream; -import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; import java.util.Map; -import static java.nio.file.StandardOpenOption.APPEND; -import static java.nio.file.StandardOpenOption.CREATE; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -46,7 +44,7 @@ public void testIndexEntryContext_load() throws IOException { NativeMemoryLoadStrategy.IndexLoadStrategy indexLoadStrategy = mock(NativeMemoryLoadStrategy.IndexLoadStrategy.class); NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext( (Directory) null, - "test", + TestUtils.createFakeNativeMamoryCacheKey("test"), indexLoadStrategy, null, "test" @@ -58,8 +56,7 @@ public void testIndexEntryContext_load() throws IOException { 10, KNNEngine.DEFAULT, "test-path", - "test-name", - null + "test-name" ); when(indexLoadStrategy.load(indexEntryContext)).thenReturn(indexAllocation); @@ -69,36 +66,37 @@ public void testIndexEntryContext_load() throws IOException { public void testIndexEntryContext_calculateSize() throws IOException { // Create a file and write random bytes to it - Path tmpFile = createTempFile(); + final Path tmpDirectory = createTempDir(); + final Directory directory = new MMapDirectory(tmpDirectory); + final String indexFileName = "test.faiss"; byte[] data = new byte[1024 * 3]; Arrays.fill(data, (byte) 'c'); - try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(tmpFile, CREATE, APPEND))) { - out.write(data, 0, data.length); - } catch (IOException x) { - fail("Failed to write to file"); + try (IndexOutput output = directory.createOutput(indexFileName, IOContext.DEFAULT)) { + output.writeBytes(data, data.length); } // Get the expected size of this function - int expectedSize = IndexUtil.getFileSizeInKB(tmpFile.toAbsolutePath().toString()); + final long expectedSizeBytes = directory.fileLength(indexFileName); + final long expectedSizeKb = expectedSizeBytes / 1024L; // Check that the indexEntryContext will return the same thing NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext( - (Directory) null, - tmpFile.toAbsolutePath().toString(), + directory, + TestUtils.createFakeNativeMamoryCacheKey(indexFileName), null, null, "test" ); - assertEquals(expectedSize, indexEntryContext.calculateSizeInKB().longValue()); + assertEquals(expectedSizeKb, indexEntryContext.calculateSizeInKB().longValue()); } public void testIndexEntryContext_getOpenSearchIndexName() { String openSearchIndexName = "test-index"; NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext( (Directory) null, - "test", + TestUtils.createFakeNativeMamoryCacheKey("test"), null, null, openSearchIndexName @@ -111,7 +109,7 @@ public void testIndexEntryContext_getParameters() { Map parameters = ImmutableMap.of("test-1", 10); NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext( (Directory) null, - "test", + TestUtils.createFakeNativeMamoryCacheKey("test"), null, parameters, "test" diff --git a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategyTests.java b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategyTests.java index 373afddc7c..8236d0518f 100644 --- a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategyTests.java +++ b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategyTests.java @@ -28,7 +28,6 @@ import org.opensearch.knn.index.engine.KNNEngine; import org.opensearch.knn.training.FloatTrainingDataConsumer; import org.opensearch.knn.training.VectorReader; -import org.opensearch.watcher.ResourceWatcherService; import java.io.IOException; import java.nio.file.Path; @@ -38,10 +37,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.eq; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; public class NativeMemoryLoadStrategyTests extends KNNTestCase { @@ -66,13 +62,9 @@ public void testIndexLoadStrategy_load() throws IOException { TestUtils.createIndex(ids, memoryAddress, dimension, path, parameters, knnEngine); // Setup mock resource manager - ResourceWatcherService resourceWatcherService = mock(ResourceWatcherService.class); - doReturn(null).when(resourceWatcherService).add(any()); - NativeMemoryLoadStrategy.IndexLoadStrategy.initialize(resourceWatcherService); - NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext( luceneDirectory, - path, + TestUtils.createFakeNativeMamoryCacheKey(indexName), NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance(), parameters, "test" @@ -116,13 +108,9 @@ public void testLoad_whenFaissBinary_thenSuccess() throws IOException { TestUtils.createIndex(ids, memoryAddress, dimension, path, parameters, knnEngine); // Setup mock resource manager - ResourceWatcherService resourceWatcherService = mock(ResourceWatcherService.class); - doReturn(null).when(resourceWatcherService).add(any()); - NativeMemoryLoadStrategy.IndexLoadStrategy.initialize(resourceWatcherService); - NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext( luceneDirectory, - path, + TestUtils.createFakeNativeMamoryCacheKey(indexName), NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance(), parameters, "test" diff --git a/src/testFixtures/java/org/opensearch/knn/TestUtils.java b/src/testFixtures/java/org/opensearch/knn/TestUtils.java index 621688b625..9145bcd161 100644 --- a/src/testFixtures/java/org/opensearch/knn/TestUtils.java +++ b/src/testFixtures/java/org/opensearch/knn/TestUtils.java @@ -22,6 +22,7 @@ import org.opensearch.knn.jni.JNIService; import org.opensearch.knn.plugin.script.KNNScoringUtil; +import java.util.Base64; import java.util.Collections; import java.util.Comparator; import java.util.Random; @@ -184,6 +185,10 @@ public static float[][] getIndexVectors(int docCount, int dimensions, boolean is } } + public static String createFakeNativeMamoryCacheKey(final String fileName) { + return fileName + "@" + Base64.getEncoder().encodeToString(fileName.getBytes()); + } + /* * Recall is the number of relevant documents retrieved by a search divided by the total number of existing relevant documents. * We are similarly calculating recall by verifying number of relevant documents obtained in the search results by comparing with