Skip to content

Commit

Permalink
Testing
Browse files Browse the repository at this point in the history
Signed-off-by: Kiran Prakash <[email protected]>
  • Loading branch information
kiranprakash154 committed Mar 14, 2024
1 parent dbda9bf commit 5676922
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,10 @@ private void updateStaleKeysCount(CleanupKey cleanupKey) {
});
}

AtomicInteger getStaleKeysCountForTesting() {
return staleKeysCount;
}

/**
* Clean cache based on stalenessThreshold
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.opensearch.common.CheckedSupplier;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.module.CacheModule;
import org.opensearch.common.cache.service.CacheService;
import org.opensearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -76,6 +78,7 @@
import java.util.Arrays;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -467,6 +470,152 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() th
terminate(threadPool);
}

public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount() throws Exception {
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexShard indexShard = createIndex("test").getShard(0);
ThreadPool threadPool = getThreadPool();
Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build();
IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> {
IndexService indexService = null;
try {
indexService = indicesService.indexServiceSafe(shardId.getIndex());
} catch (IndexNotFoundException ex) {
return Optional.empty();
}
return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id())));
}), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool);
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());

writer.addDocument(newDoc(0, "foo"));
DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);
if (randomBoolean()) {
writer.flush();
IOUtils.close(writer);
writer = new IndexWriter(dir, newIndexWriterConfig());
}
writer.updateDocument(new Term("id", "0"), newDoc(0, "bar"));
DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));

// Get 2 entries into the cache
IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard);
Loader loader = new Loader(reader, 0);
cache.getOrCompute(entity, loader, reader, termBytes);

entity = new IndicesService.IndexShardCacheEntity(indexShard);
loader = new Loader(reader, 0);
cache.getOrCompute(entity, loader, reader, termBytes);

IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard);
loader = new Loader(secondReader, 0);
cache.getOrCompute(entity, loader, secondReader, termBytes);

secondEntity = new IndicesService.IndexShardCacheEntity(indexShard);
loader = new Loader(secondReader, 0);
cache.getOrCompute(secondEntity, loader, secondReader, termBytes);
assertEquals(2, cache.count());

// Close the reader, to be enqueued for cleanup
reader.close();
AtomicInteger staleKeysCount = cache.cacheCleanupManager.getStaleKeysCountForTesting();
// 1 out of 2 keys ie 50% are now stale.
assertEquals(1, staleKeysCount.get());
// cache count should not be affected
assertEquals(2, cache.count());

OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) secondReader
.getReaderCacheHelper();
String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId();
IndicesRequestCache.Key key = new IndicesRequestCache.Key(
((IndexShard) secondEntity.getCacheIdentity()).shardId(),
termBytes,
readerCacheKeyId
);

cache.onRemoval(new RemovalNotification<IndicesRequestCache.Key, BytesReference>(key, termBytes, RemovalReason.EVICTED));
staleKeysCount = cache.cacheCleanupManager.getStaleKeysCountForTesting();
// eviction of previous stale key from the cache should decrement staleKeysCount in iRC
assertEquals(0, staleKeysCount.get());

IOUtils.close(secondReader, writer, dir, cache);
terminate(threadPool);
}

public void testStaleCount_OnRemovalNotificationOfStaleKey_DoesNotDecrementsStaleCount() throws Exception {
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexShard indexShard = createIndex("test").getShard(0);
ThreadPool threadPool = getThreadPool();
Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build();
IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> {
IndexService indexService = null;
try {
indexService = indicesService.indexServiceSafe(shardId.getIndex());
} catch (IndexNotFoundException ex) {
return Optional.empty();
}
return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id())));
}), new CacheModule(new ArrayList<>(), settings).getCacheService(), threadPool);
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());

writer.addDocument(newDoc(0, "foo"));
DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);
if (randomBoolean()) {
writer.flush();
IOUtils.close(writer);
writer = new IndexWriter(dir, newIndexWriterConfig());
}
writer.updateDocument(new Term("id", "0"), newDoc(0, "bar"));
DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));

// Get 2 entries into the cache
IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard);
Loader loader = new Loader(reader, 0);
cache.getOrCompute(entity, loader, reader, termBytes);

entity = new IndicesService.IndexShardCacheEntity(indexShard);
loader = new Loader(reader, 0);
cache.getOrCompute(entity, loader, reader, termBytes);

IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard);
loader = new Loader(secondReader, 0);
cache.getOrCompute(entity, loader, secondReader, termBytes);

secondEntity = new IndicesService.IndexShardCacheEntity(indexShard);
loader = new Loader(secondReader, 0);
cache.getOrCompute(secondEntity, loader, secondReader, termBytes);
assertEquals(2, cache.count());

// Close the reader, to be enqueued for cleanup
reader.close();
AtomicInteger staleKeysCount = cache.cacheCleanupManager.getStaleKeysCountForTesting();
// 1 out of 2 keys ie 50% are now stale.
assertEquals(1, staleKeysCount.get());
// cache count should not be affected
assertEquals(2, cache.count());

OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader
.getReaderCacheHelper();
String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId();
IndicesRequestCache.Key key = new IndicesRequestCache.Key(
((IndexShard) secondEntity.getCacheIdentity()).shardId(),
termBytes,
readerCacheKeyId
);

cache.onRemoval(new RemovalNotification<IndicesRequestCache.Key, BytesReference>(key, termBytes, RemovalReason.EVICTED));
staleKeysCount = cache.cacheCleanupManager.getStaleKeysCountForTesting();
// eviction of NON-stale key from the cache should NOT decrement staleKeysCount in iRC
assertEquals(1, staleKeysCount.get());

IOUtils.close(secondReader, writer, dir, cache);
terminate(threadPool);
}

public void testCacheCleanupBasedOnStaleThreshold_StalenessGreaterThanThreshold() throws Exception {
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexShard indexShard = createIndex("test").getShard(0);
Expand Down

0 comments on commit 5676922

Please sign in to comment.