Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add update cache ability to ZPopMax and ZPopMin #2909

Closed
wants to merge 18 commits into from

Conversation

tsinow
Copy link
Collaborator

@tsinow tsinow commented Sep 26, 2024

fix issue #2892 (执行zpopmin后还能查询到被删除数据)
zpopmin与zpopmax作为写操作不会更新到cache值,导致对cache进行读操作的指令能读到被zpopmin删除的值,因此为zpopmin与zpopmax添加了更新cache的功能。

Summary by CodeRabbit

  • New Features

    • Introduced ZPopMin and ZPopMax methods to PikaCache and RedisCache, enabling retrieval and removal of minimum and maximum elements from sorted sets.
    • Enhanced command functionality for ZPopmin and ZPopmax with new database interaction and cache update capabilities.
    • Added DoThroughDB and DoUpdateCache methods to improve command execution and cache management.
  • Bug Fixes

    • Improved cache consistency during element retrieval from sorted sets.
  • Documentation

    • Updated command flags for ZPopmin and ZPopmax to reflect new functionalities.

@github-actions github-actions bot added ✏️ Feature New feature or request 🧹 Updates This will not be worked on labels Sep 26, 2024
Copy link

coderabbitai bot commented Sep 26, 2024

Walkthrough

The pull request introduces new methods to the PikaCache and RedisCache classes for popping elements from sorted sets. Specifically, it adds ZPopMin and ZPopMax methods to both classes, enhancing their ability to manage sorted set elements. Additionally, the ZPopminCmd and ZPopmaxCmd command classes receive new methods for database interaction and cache updates. The command flags for these commands are also updated to reflect their enhanced functionality.

Changes

Files Change Summary
include/pika_cache.h, src/pika_cache.cc, src/cache/include/cache.h, src/cache/src/zset.cc Added ZPopMin and ZPopMax methods to PikaCache and RedisCache classes for popping elements from sorted sets.
include/pika_zset.h Introduced DoThroughDB and DoUpdateCache methods to ZPopmaxCmd and ZPopminCmd classes for enhanced database and cache operations.
src/pika_command.cc Updated command flags for ZPopmaxCmd and ZPopminCmd to include database interaction and cache update capabilities.

Possibly related PRs

Suggested labels

☢️ Bug

Suggested reviewers

  • chejinge
  • AlexStocks
  • Mixficsol

Poem

In the cache where scores do play,
New methods hop and dance today.
ZPopMin and Max, they leap so high,
Popping elements, oh my, oh my!
With commands that twirl and spin,
Our sorted sets now truly win! 🐰✨


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between a644856 and e4e2769.

📒 Files selected for processing (2)
  • src/cache/src/zset.cc (1 hunks)
  • src/pika_cache.cc (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/cache/src/zset.cc
  • src/pika_cache.cc

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Outside diff range and nitpick comments (8)
src/cache/include/cache.h (1)

151-151: LGTM: ZPopMax method signature is consistent with ZPopMin.

The ZPopMax method signature is well-designed and consistent with ZPopMin, which is excellent for API symmetry and usability.

Consider adding a brief inline comment above each method to describe their functionality, e.g.:

// Removes and returns up to 'count' members with the lowest scores from the sorted set at 'key'
Status ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members);

// Removes and returns up to 'count' members with the highest scores from the sorted set at 'key'
Status ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members);

This would enhance the self-documentation of the code.

include/pika_cache.h (2)

176-176: LGTM: ZPopMin method declaration

The ZPopMin method declaration is well-structured and consistent with other methods in the class. It appropriately uses rocksdb::Status as the return type and includes necessary parameters for the operation.

Consider adding a brief comment explaining the purpose of this method, its behavior, and any side effects it may have on the cache. For example:

/**
 * @brief Removes and returns up to 'count' members with the lowest scores from the sorted set stored at 'key'.
 * @param key The key of the sorted set.
 * @param count The maximum number of members to pop.
 * @param score_members Vector to store the popped members and their scores.
 * @param db Shared pointer to the database for potential fallback or consistency checks.
 * @return rocksdb::Status indicating the success or failure of the operation.
 */
rocksdb::Status ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members, const std::shared_ptr<DB>& db);

177-177: LGTM: ZPopMax method declaration

The ZPopMax method declaration is well-structured, consistent with ZPopMin, and follows the class's convention for method signatures.

Similar to ZPopMin, consider adding a brief comment explaining the purpose of this method. For example:

/**
 * @brief Removes and returns up to 'count' members with the highest scores from the sorted set stored at 'key'.
 * @param key The key of the sorted set.
 * @param count The maximum number of members to pop.
 * @param score_members Vector to store the popped members and their scores.
 * @param db Shared pointer to the database for potential fallback or consistency checks.
 * @return rocksdb::Status indicating the success or failure of the operation.
 */
rocksdb::Status ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members, const std::shared_ptr<DB>& db);
include/pika_zset.h (2)

606-607: LGTM! Consider adding brief comments.

The addition of DoThroughDB() and DoUpdateCache() methods aligns well with the PR objectives to add update cache ability to ZPopMax. The use of the override keyword is a good practice.

Consider adding brief comments above each method to describe their purpose, e.g.:

// Performs the ZPopMax operation through the database
void DoThroughDB() override;

// Updates the cache after a ZPopMax operation
void DoUpdateCache() override;

626-627: LGTM! Consider adding brief comments and maintaining consistent order.

The addition of DoUpdateCache() and DoThroughDB() methods aligns well with the PR objectives to add update cache ability to ZPopMin. The use of the override keyword is a good practice.

  1. Consider adding brief comments above each method to describe their purpose, e.g.:
// Updates the cache after a ZPopMin operation
void DoUpdateCache() override;

// Performs the ZPopMin operation through the database
void DoThroughDB() override;
  1. The order of these methods is different from the ZPopmaxCmd class. For consistency and readability, consider using the same order in both classes:
void DoThroughDB() override;
void DoUpdateCache() override;
src/pika_command.cc (1)

603-607: Consider reviewing other similar commands for consistency

The changes made to ZPopmax and ZPopmin are appropriate and address the specific issue mentioned in the PR objectives. To ensure overall consistency in the codebase, consider the following:

  1. Are there any other commands that modify sorted sets (or other data structures) that might benefit from similar cache update flags?
  2. It might be worthwhile to review the entire command table to identify any other commands that could potentially face similar cache inconsistency issues.

This proactive approach could prevent similar issues from arising with other commands in the future.

src/cache/src/zset.cc (2)

408-433: Ensure atomicity to prevent race conditions

The current implementation of ZPopMin fetches items and then removes them in separate steps. In a concurrent environment, other operations might modify the sorted set between these steps, leading to inconsistencies or race conditions. Consider implementing an atomic pop operation if available, or introduce locking mechanisms to ensure thread safety.


435-462: Ensure atomicity to prevent race conditions

Similar to ZPopMin, the ZPopMax function should ensure atomicity to avoid inconsistencies in concurrent scenarios. Implement an atomic operation or appropriate locking to maintain thread safety.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 5431d0c and 07e3580.

📒 Files selected for processing (6)
  • include/pika_cache.h (2 hunks)
  • include/pika_zset.h (2 hunks)
  • src/cache/include/cache.h (1 hunks)
  • src/cache/src/zset.cc (1 hunks)
  • src/pika_cache.cc (3 hunks)
  • src/pika_command.cc (1 hunks)
🔇 Additional comments not posted (6)
src/cache/include/cache.h (2)

150-150: LGTM: ZPopMin method signature is well-designed.

The ZPopMin method signature is appropriate and aligns with the PR objectives. It provides the necessary parameters for efficient operation on sorted sets and follows the class's convention for error handling.


150-151: Summary: Excellent addition of ZPopMin and ZPopMax methods.

The addition of ZPopMin and ZPopMax methods to the RedisCache class is well-implemented and directly addresses the PR objectives. These methods will enable updating the cache when removing elements from sorted sets, resolving the issue of querying deleted data after executing these operations.

Key points:

  1. The method signatures are consistent with other Zset operations in the class.
  2. The methods are placed logically within the Zset Commands section.
  3. The changes are minimal and focused, reducing the risk of introducing bugs.
  4. The new methods complete the set of operations for sorted sets, allowing for efficient removal of elements based on their scores.

These additions will significantly improve the integrity and accuracy of data retrieval in the context of ZPopMin and ZPopMax operations.

include/pika_cache.h (2)

19-19: LGTM: Necessary include for rocksdb::Status

The addition of #include "rocksdb/status.h" is appropriate and necessary for the new method declarations that use rocksdb::Status. The placement of the include statement is also correct, following other project-specific includes.


176-177: Verify implementation of cache updates

The new method declarations for ZPopMin and ZPopMax are in line with the PR objectives to add update cache ability for these operations. This should address the issue of querying deleted data after executing these commands (issue #2892).

To ensure that the cache update functionality is properly implemented, please verify the corresponding .cpp file where these methods are defined. The implementation should include logic to update the cache after popping elements from the sorted set.

Run the following script to locate and examine the implementation:

Please ensure that the implementation includes proper cache update logic after modifying the sorted set.

✅ Verification successful

Further Verification Required

The previous attempt to locate the ZPopMin and ZPopMax implementations did not yield the necessary information. To accurately verify the cache update logic, additional investigation is needed.

Please run the above script to identify the exact locations and verify the implementation details of ZPopMin and ZPopMax.


Cache Update Implementation Verified

The implementations of ZPopMin and ZPopMax in src/pika_cache.cc include the necessary logic to update the cache after modifying the sorted set. This confirms that the issue of querying deleted data after executing these commands (issue #2892) has been addressed.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Locate and display the implementation of ZPopMin and ZPopMax

# Find the corresponding .cpp file
implementation_file=$(fd -e cpp -e cc pika_cache)

if [ -z "$implementation_file" ]; then
  echo "Implementation file not found."
  exit 1
fi

echo "Implementation file found: $implementation_file"

# Display the implementation of ZPopMin and ZPopMax
echo "ZPopMin implementation:"
rg -A 20 "ZPopMin\s*\(" "$implementation_file"

echo "ZPopMax implementation:"
rg -A 20 "ZPopMax\s*\(" "$implementation_file"

Length of output: 765


Script:

#!/bin/bash
# Description: Locate and display the implementation of ZPopMin and ZPopMax in each .cpp file separately

# Find all corresponding .cpp files
implementation_files=$(fd -e cpp -e cc pika_cache)

if [ -z "$implementation_files" ]; then
  echo "No implementation files found."
  exit 1
fi

echo "Implementation files found:"
echo "$implementation_files"

# Iterate through each file and search for ZPopMin and ZPopMax implementations
for file in $implementation_files; do
  echo "Searching in $file:"
  
  echo "ZPopMin implementation:"
  rg -A 20 "ZPopMin\s*\(" "$file" || echo "ZPopMin not found in $file"
  
  echo "ZPopMax implementation:"
  rg -A 20 "ZPopMax\s*\(" "$file" || echo "ZPopMax not found in $file"
  
  echo "----------------------------------------"
done

Length of output: 3712

src/pika_command.cc (2)

603-603: Approved: Enhanced ZPopmax command with cache update capability

The changes to the ZPopmax command are appropriate and align with the PR objectives. By adding the kCmdFlagsDoThroughDB and kCmdFlagsUpdateCache flags, the command will now:

  1. Ensure execution through the database layer.
  2. Update the cache after modifying the data.

These modifications address the issue of potential data inconsistency between the cache and the actual dataset after pop operations.


607-607: Approved: ZPopmin command updated consistently with ZPopmax

The modifications to the ZPopmin command mirror those made to ZPopmax, which is the correct approach. By adding the kCmdFlagsDoThroughDB and kCmdFlagsUpdateCache flags, we ensure that:

  1. The command is executed through the database layer.
  2. The cache is updated after the pop operation.

This change directly addresses the issue mentioned in the PR objectives, preventing the possibility of querying deleted data after a zpopmin operation. The consistency between ZPopmax and ZPopmin is commendable, as it maintains a uniform behavior for similar operations.

Comment on lines +606 to +607
void DoThroughDB() override;
void DoUpdateCache() override;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Missing Implementations for DoThroughDB() and DoUpdateCache()

It appears that the DoThroughDB() and DoUpdateCache() methods are declared in include/pika_zset.h for both ZPopmaxCmd and ZPopminCmd classes but lack corresponding implementations in src/pika_zset.cc. This discrepancy may lead to linker errors or undefined behavior when these methods are invoked.

🔗 Analysis chain

Please provide the implementation details for verification.

The changes in the header file look good and align with the PR objectives. However, to fully verify the correctness of the implementation, we need to see the corresponding .cpp file where DoThroughDB() and DoUpdateCache() methods are defined for both ZPopmaxCmd and ZPopminCmd classes.

Could you please provide the implementation details? Specifically, we need to verify:

  1. The cache update logic in DoUpdateCache() for both commands.
  2. The database interaction in DoThroughDB() for both commands.
  3. How these methods ensure that deleted data is no longer accessible through cache reads after executing zpopmin or zpopmax.

To help with this verification, you can run the following script to find and display the relevant implementation files:

This script will help us locate and examine the implementation details, allowing for a more thorough review of the cache update logic and database interactions.

Also applies to: 626-627

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Find and display the implementation files for ZPopmaxCmd and ZPopminCmd

echo "Searching for ZPopmaxCmd and ZPopminCmd implementations..."

# Search for ZPopmaxCmd implementation
echo "ZPopmaxCmd implementation:"
rg -p "ZPopmaxCmd::Do|ZPopmaxCmd::DoThroughDB|ZPopmaxCmd::DoUpdateCache" --type cpp

echo "\nZPopminCmd implementation:"
# Search for ZPopminCmd implementation
rg -p "ZPopminCmd::Do|ZPopminCmd::DoThroughDB|ZPopminCmd::DoUpdateCache" --type cpp

echo "\nDisplaying the content of the files found (if any):"
rg -p "ZPopmaxCmd::Do|ZPopmaxCmd::DoThroughDB|ZPopmaxCmd::DoUpdateCache|ZPopminCmd::Do|ZPopminCmd::DoThroughDB|ZPopminCmd::DoUpdateCache" --type cpp -A 20

Length of output: 3361

Comment on lines 416 to 418
if (RcZrange(cache_, kobj, 0, count - 1, &items, &items_size) != C_OK) {
return Status::Corruption("Failed to get items");
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent potential memory leak when RcZrange fails

In ZPopMin, if RcZrange returns an error, the allocated items may not be freed, leading to a memory leak.

Apply this diff to ensure items are freed upon failure:

     if (RcZrange(cache_, kobj, 0, count - 1, &items, &items_size) != C_OK) {
+        if (items != nullptr) {
+            FreeZitemList(items, items_size);
+        }
         return Status::Corruption("Failed to get items");
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (RcZrange(cache_, kobj, 0, count - 1, &items, &items_size) != C_OK) {
return Status::Corruption("Failed to get items");
}
if (RcZrange(cache_, kobj, 0, count - 1, &items, &items_size) != C_OK) {
if (items != nullptr) {
FreeZitemList(items, items_size);
}
return Status::Corruption("Failed to get items");
}

Comment on lines 445 to 447
if (RcZrange(cache_, kobj, start, end, &items, &items_size) != C_OK) {
return Status::Corruption("Failed to get items");
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent potential memory leak when RcZrange fails

Similarly, in ZPopMax, ensure that items are freed if RcZrange fails to prevent memory leaks.

Apply this diff:

     if (RcZrange(cache_, kobj, start, end, &items, &items_size) != C_OK) {
+        if (items != nullptr) {
+            FreeZitemList(items, items_size);
+        }
         return Status::Corruption("Failed to get items");
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (RcZrange(cache_, kobj, start, end, &items, &items_size) != C_OK) {
return Status::Corruption("Failed to get items");
}
if (RcZrange(cache_, kobj, start, end, &items, &items_size) != C_OK) {
if (items != nullptr) {
FreeZitemList(items, items_size);
}
return Status::Corruption("Failed to get items");
}

Comment on lines 435 to 462
Status RedisCache::ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) {
zitem* items = nullptr;
unsigned long items_size = 0;
robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
};

long start = -count;
long end = -1;
if (RcZrange(cache_, kobj, start, end, &items, &items_size) != C_OK) {
return Status::Corruption("Failed to get items");
}

for (unsigned long i = 0; i < items_size; ++i) {
storage::ScoreMember sm;
sm.score = items[i].score;
sm.member.assign(items[i].member, sdslen(items[i].member));
score_members->push_back(sm);

robj* member_obj = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
RcZRem(cache_, kobj, &member_obj, 1);
DecrObjectsRefCount(member_obj);
}

FreeZitemList(items, items_size);
return Status::OK();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize batch removal of members to improve performance

In the ZPopMax function, similar to ZPopMin, members are removed individually within a loop. Batching the removals can improve performance by reducing the number of function calls.

Apply this diff to modify the code:

 Status RedisCache::ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) {
     zitem* items = nullptr;
     unsigned long items_size = 0;
     robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
     DEFER {
         DecrObjectsRefCount(kobj);
     };

     long start = -count;
     long end = -1;
     if (RcZrange(cache_, kobj, start, end, &items, &items_size) != C_OK) {
+        if (items != nullptr) {
+            FreeZitemList(items, items_size);
+        }
         return Status::Corruption("Failed to get items");
     }

+    robj** member_objs = (robj**)zcallocate(sizeof(robj*) * items_size);
     for (unsigned long i = 0; i < items_size; ++i) {
         storage::ScoreMember sm;
         sm.score = items[i].score;
         sm.member.assign(items[i].member, sdslen(items[i].member));
         score_members->push_back(sm);

-        robj* member_obj = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
-        RcZRem(cache_, kobj, &member_obj, 1);
-        DecrObjectsRefCount(member_obj);
+        member_objs[i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
     }

+    int ret = RcZRem(cache_, kobj, member_objs, items_size);
+    if (ret != C_OK) {
+        FreeObjectList(member_objs, items_size);
+        FreeZitemList(items, items_size);
+        return Status::Corruption("RcZRem failed");
+    }
+    FreeObjectList(member_objs, items_size);

     FreeZitemList(items, items_size);
     return Status::OK();
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status RedisCache::ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) {
zitem* items = nullptr;
unsigned long items_size = 0;
robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
};
long start = -count;
long end = -1;
if (RcZrange(cache_, kobj, start, end, &items, &items_size) != C_OK) {
return Status::Corruption("Failed to get items");
}
for (unsigned long i = 0; i < items_size; ++i) {
storage::ScoreMember sm;
sm.score = items[i].score;
sm.member.assign(items[i].member, sdslen(items[i].member));
score_members->push_back(sm);
robj* member_obj = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
RcZRem(cache_, kobj, &member_obj, 1);
DecrObjectsRefCount(member_obj);
}
FreeZitemList(items, items_size);
return Status::OK();
}
Status RedisCache::ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) {
zitem* items = nullptr;
unsigned long items_size = 0;
robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
};
long start = -count;
long end = -1;
if (RcZrange(cache_, kobj, start, end, &items, &items_size) != C_OK) {
if (items != nullptr) {
FreeZitemList(items, items_size);
}
return Status::Corruption("Failed to get items");
}
robj** member_objs = (robj**)zcallocate(sizeof(robj*) * items_size);
for (unsigned long i = 0; i < items_size; ++i) {
storage::ScoreMember sm;
sm.score = items[i].score;
sm.member.assign(items[i].member, sdslen(items[i].member));
score_members->push_back(sm);
member_objs[i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
}
int ret = RcZRem(cache_, kobj, member_objs, items_size);
if (ret != C_OK) {
FreeObjectList(member_objs, items_size);
FreeZitemList(items, items_size);
return Status::Corruption("RcZRem failed");
}
FreeObjectList(member_objs, items_size);
FreeZitemList(items, items_size);
return Status::OK();
}

Comment on lines 408 to 433
Status RedisCache::ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) {
zitem* items = nullptr;
unsigned long items_size = 0;
robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
};

if (RcZrange(cache_, kobj, 0, count - 1, &items, &items_size) != C_OK) {
return Status::Corruption("Failed to get items");
}

for (unsigned long i = 0; i < items_size; ++i) {
storage::ScoreMember sm;
sm.score = items[i].score;
sm.member.assign(items[i].member, sdslen(items[i].member));
score_members->push_back(sm);

robj* member_obj = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
RcZRem(cache_, kobj, &member_obj, 1);
DecrObjectsRefCount(member_obj);
}

FreeZitemList(items, items_size);
return Status::OK();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize batch removal of members to improve performance

In the ZPopMin function, members are being removed individually within a loop by calling RcZRem for each item. This could lead to performance issues due to multiple function calls and overhead. To enhance performance, consider batching the removal of all members in a single call to RcZRem.

Apply this diff to modify the code:

 Status RedisCache::ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) {
     zitem* items = nullptr;
     unsigned long items_size = 0;
     robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
     DEFER {
         DecrObjectsRefCount(kobj);
     };

     if (RcZrange(cache_, kobj, 0, count - 1, &items, &items_size) != C_OK) {
+        if (items != nullptr) {
+            FreeZitemList(items, items_size);
+        }
         return Status::Corruption("Failed to get items");
     }

+    robj** member_objs = (robj**)zcallocate(sizeof(robj*) * items_size);
     for (unsigned long i = 0; i < items_size; ++i) {
         storage::ScoreMember sm;
         sm.score = items[i].score;
         sm.member.assign(items[i].member, sdslen(items[i].member));
         score_members->push_back(sm);

-        robj* member_obj = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
-        RcZRem(cache_, kobj, &member_obj, 1);
-        DecrObjectsRefCount(member_obj);
+        member_objs[i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
     }

+    int ret = RcZRem(cache_, kobj, member_objs, items_size);
+    if (ret != C_OK) {
+        FreeObjectList(member_objs, items_size);
+        FreeZitemList(items, items_size);
+        return Status::Corruption("RcZRem failed");
+    }
+    FreeObjectList(member_objs, items_size);

     FreeZitemList(items, items_size);
     return Status::OK();
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status RedisCache::ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) {
zitem* items = nullptr;
unsigned long items_size = 0;
robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
};
if (RcZrange(cache_, kobj, 0, count - 1, &items, &items_size) != C_OK) {
return Status::Corruption("Failed to get items");
}
for (unsigned long i = 0; i < items_size; ++i) {
storage::ScoreMember sm;
sm.score = items[i].score;
sm.member.assign(items[i].member, sdslen(items[i].member));
score_members->push_back(sm);
robj* member_obj = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
RcZRem(cache_, kobj, &member_obj, 1);
DecrObjectsRefCount(member_obj);
}
FreeZitemList(items, items_size);
return Status::OK();
}
Status RedisCache::ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members) {
zitem* items = nullptr;
unsigned long items_size = 0;
robj* kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
};
if (RcZrange(cache_, kobj, 0, count - 1, &items, &items_size) != C_OK) {
if (items != nullptr) {
FreeZitemList(items, items_size);
}
return Status::Corruption("Failed to get items");
}
robj** member_objs = (robj**)zcallocate(sizeof(robj*) * items_size);
for (unsigned long i = 0; i < items_size; ++i) {
storage::ScoreMember sm;
sm.score = items[i].score;
sm.member.assign(items[i].member, sdslen(items[i].member));
score_members->push_back(sm);
member_objs[i] = createObject(OBJ_STRING, sdsnewlen(items[i].member, sdslen(items[i].member)));
}
int ret = RcZRem(cache_, kobj, member_objs, items_size);
if (ret != C_OK) {
FreeObjectList(member_objs, items_size);
FreeZitemList(items, items_size);
return Status::Corruption("RcZRem failed");
}
FreeObjectList(member_objs, items_size);
FreeZitemList(items, items_size);
return Status::OK();
}

Comment on lines 1490 to 1507
Status PikaCache::ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members, const std::shared_ptr<DB>& db) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);

auto cache_obj = caches_[cache_index];
Status s;

if (cache_obj->Exists(key)) {
return cache_obj->ZPopMax(key, count, score_members);
} else {
score_members->clear();
s = db->storage()->ZPopMax(key, count, score_members);
if (s.ok()) {
cache_obj->ZAdd(key, *score_members);
}
return s;
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Logical Error: Adding Popped Elements Back to the Cache in ZPopMax

Similarly, in the ZPopMax method, the code adds the elements popped from the database back into the cache when the key does not exist in the cache. This can cause inconsistencies, as we are reintroducing elements that have been deleted. The cache should reflect the current state of the database after the ZPopMax operation.

Consider removing the line that adds the popped elements back to the cache:

         if (s.ok()) {
-            cache_obj->ZAdd(key, *score_members);
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status PikaCache::ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members, const std::shared_ptr<DB>& db) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);
auto cache_obj = caches_[cache_index];
Status s;
if (cache_obj->Exists(key)) {
return cache_obj->ZPopMax(key, count, score_members);
} else {
score_members->clear();
s = db->storage()->ZPopMax(key, count, score_members);
if (s.ok()) {
cache_obj->ZAdd(key, *score_members);
}
return s;
}
}
Status PikaCache::ZPopMax(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members, const std::shared_ptr<DB>& db) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);
auto cache_obj = caches_[cache_index];
Status s;
if (cache_obj->Exists(key)) {
return cache_obj->ZPopMax(key, count, score_members);
} else {
score_members->clear();
s = db->storage()->ZPopMax(key, count, score_members);
if (s.ok()) {
// Removed: cache_obj->ZAdd(key, *score_members);
}
return s;
}
}

Comment on lines 1471 to 1488
Status PikaCache::ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members, const std::shared_ptr<DB>& db) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);

auto cache_obj = caches_[cache_index];
Status s;

if (cache_obj->Exists(key)) {
return cache_obj->ZPopMin(key, count, score_members);
} else {
score_members->clear();
s = db->storage()->ZPopMin(key, count, score_members);
if (s.ok()) {
cache_obj->ZAdd(key, *score_members);
}
return s;
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Logical Error: Adding Popped Elements Back to the Cache in ZPopMin

In the ZPopMin method, when the key does not exist in the cache, after removing elements from the database using db->storage()->ZPopMin, the code adds these popped elements back into the cache with cache_obj->ZAdd(key, *score_members);. This contradicts the expected behavior because the elements have been removed from the database and should not be reintroduced into the cache. This can lead to inconsistencies between the cache and the database.

Consider removing the line that adds the popped elements back to the cache:

         if (s.ok()) {
-            cache_obj->ZAdd(key, *score_members);
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Status PikaCache::ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members, const std::shared_ptr<DB>& db) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);
auto cache_obj = caches_[cache_index];
Status s;
if (cache_obj->Exists(key)) {
return cache_obj->ZPopMin(key, count, score_members);
} else {
score_members->clear();
s = db->storage()->ZPopMin(key, count, score_members);
if (s.ok()) {
cache_obj->ZAdd(key, *score_members);
}
return s;
}
}
Status PikaCache::ZPopMin(std::string& key, int64_t count, std::vector<storage::ScoreMember>* score_members, const std::shared_ptr<DB>& db) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);
auto cache_obj = caches_[cache_index];
Status s;
if (cache_obj->Exists(key)) {
return cache_obj->ZPopMin(key, count, score_members);
} else {
score_members->clear();
s = db->storage()->ZPopMin(key, count, score_members);
if (s.ok()) {
// Removed: cache_obj->ZAdd(key, *score_members);
}
return s;
}
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (1)
src/pika_zset.cc (1)

Line range hint 1510-1565: Overall implementation is good, with one important fix needed.

The changes successfully add cache update ability to ZPopMax and ZPopMin commands, which aligns with the PR objective. The implementation is generally correct and consistent. However, there's an important fix needed in the ZPopmaxCmd::DoUpdateCache method, where ZPopMin is incorrectly used instead of ZPopMax. Once this is addressed, the implementation will fully meet the requirements.

Please ensure the fix for ZPopmaxCmd::DoUpdateCache is implemented as suggested in the previous comment.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 82e3e85 and 079d11b.

📒 Files selected for processing (1)
  • src/pika_zset.cc (2 hunks)
🔇 Additional comments not posted (3)
src/pika_zset.cc (3)

1510-1512: Implementation looks good.

The DoThroughDB method correctly delegates to the Do method, maintaining the existing functionality while allowing for the new cache update ability.


1556-1558: Implementation looks good.

The DoThroughDB method correctly delegates to the Do method, maintaining the existing functionality while allowing for the new cache update ability.


1560-1565: Implementation looks good.

The DoUpdateCache method correctly updates the cache by calling ZPopMin with the appropriate parameters. The condition for updating the cache is correct, ensuring consistency between the storage and cache layers.

src/pika_zset.cc Outdated
Comment on lines 1514 to 1519
void ZPopmaxCmd::DoUpdateCache(){
std::vector<storage::ScoreMember> score_members;
if(s_.ok() || s_.IsNotFound()){
db_->cache()->ZPopMin(key_, count_, &score_members, db_);
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect cache update method call.

The DoUpdateCache method is calling ZPopMin instead of ZPopMax. This is inconsistent with the command's purpose and may lead to incorrect cache updates.

Please update the cache method call as follows:

-      db_->cache()->ZPopMin(key_, count_, &score_members, db_);
+      db_->cache()->ZPopMax(key_, count_, &score_members, db_);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void ZPopmaxCmd::DoUpdateCache(){
std::vector<storage::ScoreMember> score_members;
if(s_.ok() || s_.IsNotFound()){
db_->cache()->ZPopMin(key_, count_, &score_members, db_);
}
}
void ZPopmaxCmd::DoUpdateCache(){
std::vector<storage::ScoreMember> score_members;
if(s_.ok() || s_.IsNotFound()){
db_->cache()->ZPopMax(key_, count_, &score_members, db_);
}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
✏️ Feature New feature or request 🧹 Updates This will not be worked on
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants