Skip to content

Commit

Permalink
remove locks and make all methods synchronized
Browse files Browse the repository at this point in the history
Signed-off-by: Kiran Prakash <[email protected]>
  • Loading branch information
kiranprakash154 committed Mar 13, 2024
1 parent 96a6ef7 commit cb08de0
Showing 1 changed file with 49 additions and 82 deletions.
131 changes: 49 additions & 82 deletions server/src/main/java/org/opensearch/indices/IndicesRequestCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.ToLongBiFunction;

Expand Down Expand Up @@ -429,18 +426,13 @@ class IndicesRequestCacheCleanupManager implements Closeable {
private final ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap;
private final AtomicInteger staleKeysCount;
private final double stalenessThreshold;
private final Lock readLock;
private final Lock writeLock;
private final IndicesRequestCacheCleaner cacheCleaner;

IndicesRequestCacheCleanupManager(ThreadPool threadpool,TimeValue cleanInterval, double stalenessThreshold) {
this.stalenessThreshold = stalenessThreshold;
this.keysToClean = ConcurrentCollections.newConcurrentSet();
this.cleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap();
this.staleKeysCount = new AtomicInteger(0);
ReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
this.cacheCleaner = new IndicesRequestCacheCleaner(this, threadpool, cleanInterval);
threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME);
}
Expand Down Expand Up @@ -484,7 +476,7 @@ private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) {
.merge(cleanupKey.readerCacheKeyId, 1, Integer::sum);
}

private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
private synchronized void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
if (stalenessThreshold == 0.0) {
return;
}
Expand All @@ -495,15 +487,10 @@ private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
}
ShardId shardId = indexShard.shardId();

writeLock.lock();
try {
// If the key doesn't exist, ignore
ConcurrentMap<String, Integer> keyCountMap = cleanupKeyToCountMap.get(shardId);
if (keyCountMap != null) {
keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, value) -> value - 1);
}
} finally {
writeLock.unlock();
// If the key doesn't exist, ignore
ConcurrentMap<String, Integer> keyCountMap = cleanupKeyToCountMap.get(shardId);
if (keyCountMap != null) {
keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, value) -> value - 1);
}
}

Expand Down Expand Up @@ -576,41 +563,36 @@ private void cleanCache(double threshold) {
if (logger.isDebugEnabled()) {
logger.debug("Cleaning Indices Request Cache with threshold : " + threshold);
}
writeLock.lock();
try {
if (canSkipCacheCleanup(threshold)) {
return;
}
// Contains CleanupKey objects with open shard but invalidated readerCacheKeyId.
final Set<CleanupKey> cleanupKeysFromOutdatedReaders = new HashSet<>();
// Contains CleanupKey objects of a closed shard.
final Set<Object> cleanupKeysFromClosedShards = new HashSet<>();

for (Iterator<CleanupKey> iterator = keysToClean.iterator(); iterator.hasNext();) {
CleanupKey cleanupKey = iterator.next();
iterator.remove();
if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) {
// null indicates full cleanup, as does a closed shard
cleanupKeysFromClosedShards.add(((IndexShard) cleanupKey.entity.getCacheIdentity()).shardId());
} else {
cleanupKeysFromOutdatedReaders.add(cleanupKey);
}
if (canSkipCacheCleanup(threshold)) {
return;
}
// Contains CleanupKey objects with open shard but invalidated readerCacheKeyId.
final Set<CleanupKey> cleanupKeysFromOutdatedReaders = new HashSet<>();
// Contains CleanupKey objects of a closed shard.
final Set<Object> cleanupKeysFromClosedShards = new HashSet<>();

for (Iterator<CleanupKey> iterator = keysToClean.iterator(); iterator.hasNext(); ) {
CleanupKey cleanupKey = iterator.next();
iterator.remove();
if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) {
// null indicates full cleanup, as does a closed shard
cleanupKeysFromClosedShards.add(((IndexShard) cleanupKey.entity.getCacheIdentity()).shardId());
} else {
cleanupKeysFromOutdatedReaders.add(cleanupKey);
}
}

if (cleanupKeysFromOutdatedReaders.isEmpty() && cleanupKeysFromClosedShards.isEmpty()) {
return;
}
if (cleanupKeysFromOutdatedReaders.isEmpty() && cleanupKeysFromClosedShards.isEmpty()) {
return;
}

for (Iterator<Key> iterator = cache.keys().iterator(); iterator.hasNext();) {
Key key = iterator.next();
if (shouldRemoveKey(key, cleanupKeysFromOutdatedReaders, cleanupKeysFromClosedShards)) {
iterator.remove();
}
for (Iterator<Key> iterator = cache.keys().iterator(); iterator.hasNext(); ) {
Key key = iterator.next();
if (shouldRemoveKey(key, cleanupKeysFromOutdatedReaders, cleanupKeysFromClosedShards)) {
iterator.remove();
}
cache.refresh();
} finally {
writeLock.unlock();
}
cache.refresh();
}

/**
Expand All @@ -626,17 +608,12 @@ private void cleanCache(double threshold) {
* @param cleanupKeysFromClosedShards A set of CleanupKeys of a closed shard.
* @return true if the key should be removed, false otherwise.
*/
private boolean shouldRemoveKey(Key key, Set<CleanupKey> cleanupKeysFromOutdatedReaders, Set<Object> cleanupKeysFromClosedShards) {
readLock.lock();
try {
if (cleanupKeysFromClosedShards.contains(key.shardId)) {
return true;
} else {
CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId);
return cleanupKeysFromOutdatedReaders.contains(cleanupKey);
}
} finally {
readLock.unlock();
private synchronized boolean shouldRemoveKey(Key key, Set<CleanupKey> cleanupKeysFromOutdatedReaders, Set<Object> cleanupKeysFromClosedShards) {
if (cleanupKeysFromClosedShards.contains(key.shardId)) {
return true;
} else {
CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId);
return cleanupKeysFromOutdatedReaders.contains(cleanupKey);
}
}

Expand All @@ -649,23 +626,18 @@ private boolean shouldRemoveKey(Key key, Set<CleanupKey> cleanupKeysFromOutdated
* @param cleanThresholdPercent The staleness threshold as a percentage.
* @return true if the cache cleanup process can be skipped, false otherwise.
*/
private boolean canSkipCacheCleanup(double cleanThresholdPercent) {
private synchronized boolean canSkipCacheCleanup(double cleanThresholdPercent) {
if (cleanThresholdPercent == 0.0) {
return false;
}
readLock.lock();
try {
if (staleKeysInCachePercentage() < cleanThresholdPercent) {
if (logger.isDebugEnabled()) {
logger.debug(
"Skipping cache cleanup since the percentage of stale keys is less than the threshold : "
+ stalenessThreshold
);
}
return true;
if (staleKeysInCachePercentage() < cleanThresholdPercent) {
if (logger.isDebugEnabled()) {
logger.debug(
"Skipping cache cleanup since the percentage of stale keys is less than the threshold : "
+ stalenessThreshold
);
}
} finally {
readLock.unlock();
return true;
}
return false;
}
Expand All @@ -675,17 +647,12 @@ private boolean canSkipCacheCleanup(double cleanThresholdPercent) {
*
* @return The percentage of stale keys in the cache as a double. Returns 0 if there are no keys in the cache or no stale keys.
*/
private double staleKeysInCachePercentage() {
readLock.lock();
try {
long totalKeysInCache = count();
if (totalKeysInCache == 0 || staleKeysCount.get() == 0) {
return 0;
}
return ((double) staleKeysCount.get() / totalKeysInCache);
} finally {
readLock.unlock();
private synchronized double staleKeysInCachePercentage() {
long totalKeysInCache = count();
if (totalKeysInCache == 0 || staleKeysCount.get() == 0) {
return 0;
}
return ((double) staleKeysCount.get() / totalKeysInCache);
}

@Override
Expand Down

0 comments on commit cb08de0

Please sign in to comment.