Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server][common] Fixed bug in AA/WC parallel processing support #1252

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

gaojieliu
Copy link
Contributor

Summary, imperative, start upper case, don't end with a period

This PR fixed the following issues:

  1. AASIT should pass non-null KeyLevelLocksManager to IngestionBatchProcessor, otherwise, race condition will happen.
  2. Fixed the locking order in IngestionBatchProcessor to avoid deadlock.
  3. Updated SparseConcurrentList#computeIfAbsent to skip adjust list size if the computed result is null.

How was this PR tested?

CI

Does this PR introduce any user-facing changes?

  • No. You can skip the rest of this section.
  • Yes. Make sure to explain your proposed changes and call out the behavior change.

This PR fixed the following issues:
1. AASIT should pass non-null `KeyLevelLocksManager` to `IngestionBatchProcessor`, otherwise,
   race condition will happen.
2. Fixed the locking order in `IngestionBatchProcessor` to avoid deadlock.
3. Updated `SparseConcurrentList#computeIfAbsent` to skip adjust list size if the computed result is `null`.
* are trying to lock the same set of keys with different orders, deadlock can happen.
*/
Map<ByteArrayKey, ReentrantLock> keyLockMap =
new TreeMap<>((o1, o2) -> ByteUtils.compare(o1.getContent(), o2.getContent()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we compare with the ByteArrayKey::hashCode() should be faster than comparing byte arrays.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hash collision can become a disaster, which might result in deadlock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure where the current ByteArrayKey hashCode is being used, but current hash generation looks too simple, does not use SHA or MD5. It might use hash collision more frequently.

Comment on lines +108 to +109
lock.unlock();
lockManager.releaseLock(key);
Copy link
Contributor

@FelixGV FelixGV Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pre-existing code, not something you are changing in this PR, but I wonder... why did we design the KeyLevelLocksManager in this way? IIUC, the lock.unlock() unlocks (duh) and then the lockManager.releaseLock(key) decreases the ref count and potentially returns the lock to the pool... Is there ever a situation where we would wish to do one of these operations but not the other? At first glance, it seems to me that the only correct way to use this API is to do both of these calls. So why aren't they fused into a single function? IOW, why do I need to handle the ReentrantLock object at all? Why not make the unlock a side-effect of the lockManager.releaseLock(key) call?

And likewise for the locking, why does lockManager.acquireLockByKey(k) return a lock object? Why not make it a void function that has the side-effect of locking? It seems that we would need to carry less state around, and the chance of misuse would be reduced...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is reasonable.
@huangminchn Can you chime in here regarding the above question?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fusing two calls into one should work and we do have cases to call lock/unlock multiple times in the same thread, so maintaining a counter will be beneficial.
When this feature is enabled, consumer will lock a batch of keys, and to be backward compatible, the existing function will lock the same key again:
https://github.com/linkedin/venice/blob/main/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java#L225

As you mentioned, we can create a wrapper of lock to handle the reference counting logic.
So far, I don't see a case, which would call one of the functions only.

PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> message = r.getMessage();
if (!message.getKey().isControlMessage()) {
for (PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> message: records) {
if (!message.getTopicPartition().getPubSubTopic().isRealTime()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check for not control message here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emm.. I don't think there is a need and we don't have a use case where the CM is from VT, but actual messages are from RT, right?

Copy link
Contributor

@FelixGV FelixGV left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I am doing mostly white line commenting in this round of review. I should have reviewed the first round of implementation of this functionality, but didn't get a chance, so I'm coming to the party a bit late. I just want to provide the following feedback:

The part of produceToStoreBufferServiceOrKafkaInBatch which splits the records coming from Kafka into "mini batches" seems not ideal to me, and I feel like we should get rid of it. To be clear, I don't think the current code cannot work, but just that it is more complex than it should be. I will list below some issues/concerns that I see with it, and all of these can be fixed individually, but I still think getting rid of that part of the algorithm could be overall better:

  1. The batch size comes from serverConfig.getAAWCWorkloadParallelProcessingThreadPoolSize(), which does not seem to make sense. The default value of that config is 8, which is likely too small. I think the intent was probably to have a separate config for the batch size.
  2. If the Kafka batch is smaller than the configured batch size, then we still instantiate an ongoingBatch of the configured size, which is wasteful in terms of memory allocation (easy to fix, but is it needed?).
  3. I expect that the batch size coming from Kafka should not be too large to handle, and so there is no point in splitting it. If anything, I would think there may be cases where the batch from Kafka is too small and that the optimal thing may be to accumulate records for some time and then create a mega batch (the opposite of a mini batch!).
  4. One of the benefits we hope to get from this functionality is to process together the updates to the same key, so that we do fewer RocksDB reads and writes in those cases. This effect is reduced by the mini batches.

So my recommendation would be to simplify this code by deleting the mini batching altogether, and just pass the Kafka batch directly into the next step. No need to implement the mega batching yet, we can make that call later.

@gaojieliu
Copy link
Contributor Author

Sorry, I am doing mostly white line commenting in this round of review. I should have reviewed the first round of implementation of this functionality, but didn't get a chance, so I'm coming to the party a bit late. I just want to provide the following feedback:

The part of produceToStoreBufferServiceOrKafkaInBatch which splits the records coming from Kafka into "mini batches" seems not ideal to me, and I feel like we should get rid of it. To be clear, I don't think the current code cannot work, but just that it is more complex than it should be. I will list below some issues/concerns that I see with it, and all of these can be fixed individually, but I still think getting rid of that part of the algorithm could be overall better:

  1. The batch size comes from serverConfig.getAAWCWorkloadParallelProcessingThreadPoolSize(), which does not seem to make sense. The default value of that config is 8, which is likely too small. I think the intent was probably to have a separate config for the batch size.
  2. If the Kafka batch is smaller than the configured batch size, then we still instantiate an ongoingBatch of the configured size, which is wasteful in terms of memory allocation (easy to fix, but is it needed?).
  3. I expect that the batch size coming from Kafka should not be too large to handle, and so there is no point in splitting it. If anything, I would think there may be cases where the batch from Kafka is too small and that the optimal thing may be to accumulate records for some time and then create a mega batch (the opposite of a mini batch!).
  4. One of the benefits we hope to get from this functionality is to process together the updates to the same key, so that we do fewer RocksDB reads and writes in those cases. This effect is reduced by the mini batches.

So my recommendation would be to simplify this code by deleting the mini batching altogether, and just pass the Kafka batch directly into the next step. No need to implement the mega batching yet, we can make that call later.

The biggest reason of mini-batching is to make the key-level lock usage deterministic...
If we always pass what we received to IngestionBatchProcessor, the KeyLevelLockManager will need to consider the max possible batch size, which is 100 IIRC.
4) is not implemented yet, so the smaller batch size in theory shouldn't affect performance much.
When working on 4), we need to use a separate config to control the batch size, so that we can tune the memory usage/performance benefits gained by de-duping.

For 2), today's interface is Iterable, so we can't get the size without iterating it and to reduce the unnecessary copy, we need to change the interface to use something like List.

@@ -1256,7 +1257,7 @@ protected void produceToStoreBufferServiceOrKafkaInBatch(
* Process records batch by batch.
Copy link
Contributor

@FelixGV FelixGV Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to start a proper thread otherwise tracking replies in the top-level "Conversation" tab is tedious... pasting the previous discussion here:

Sorry, I am doing mostly white line commenting in this round of review. I should have reviewed the first round of implementation of this functionality, but didn't get a chance, so I'm coming to the party a bit late. I just want to provide the following feedback:

The part of produceToStoreBufferServiceOrKafkaInBatch which splits the records coming from Kafka into "mini batches" seems not ideal to me, and I feel like we should get rid of it. To be clear, I don't think the current code cannot work, but just that it is more complex than it should be. I will list below some issues/concerns that I see with it, and all of these can be fixed individually, but I still think getting rid of that part of the algorithm could be overall better:

  1. The batch size comes from serverConfig.getAAWCWorkloadParallelProcessingThreadPoolSize(), which does not seem to make sense. The default value of that config is 8, which is likely too small. I think the intent was probably to have a separate config for the batch size.
  2. If the Kafka batch is smaller than the configured batch size, then we still instantiate an ongoingBatch of the configured size, which is wasteful in terms of memory allocation (easy to fix, but is it needed?).
  3. I expect that the batch size coming from Kafka should not be too large to handle, and so there is no point in splitting it. If anything, I would think there may be cases where the batch from Kafka is too small and that the optimal thing may be to accumulate records for some time and then create a mega batch (the opposite of a mini batch!).
  4. One of the benefits we hope to get from this functionality is to process together the updates to the same key, so that we do fewer RocksDB reads and writes in those cases. This effect is reduced by the mini batches.

So my recommendation would be to simplify this code by deleting the mini batching altogether, and just pass the Kafka batch directly into the next step. No need to implement the mega batching yet, we can make that call later.

The biggest reason of mini-batching is to make the key-level lock usage deterministic...

If we always pass what we received to IngestionBatchProcessor, the KeyLevelLockManager will need to consider the max possible batch size, which is 100 IIRC.

Here, do you mean that the KeyLevelLocksManager.maxPoolSize must be smaller than the processed batch size, otherwise the function to lock all keys of the batch would stall?

  1. is not implemented yet, so the smaller batch size in theory shouldn't affect performance much.
    When working on 4), we need to use a separate config to control the batch size, so that we can tune the memory usage/performance benefits gained by de-duping.

For 2), today's interface is Iterable, so we can't get the size without iterating it and to reduce the unnecessary copy, we need to change the interface to use something like List.

That is easy, since the upstream function is StorePartitionDataReceiver::write(List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> consumedData), which calls SIT::produceToStoreBufferServiceOrKafka (at which point the type gets constrained to Iterable). It could be made into a List all the way through.

Coming back to point 1... I've read some more of the code and I think I finally understand what's going on... the batch size is set so that IngestionBatchProcessor::process submits roughly 1 key per thread to the batchProcessingThreadPool... But is this going to be efficient? I wonder if the granularity is too fine... We will submit a lot of tasks into this thread pool. It might be better to submit fewer tasks, each of which contains a bigger collection of events to process.

Anyway, we can keep iterating on this new functionality and optimizing it later. No need to get everything perfect from the get go... but it seems to me that there might be many such opportunities. Lots of copying and processing that might be collapsible into fewer steps, and then also taking advantage of writes to the same key.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants