Skip to content

Conversation

@leslizhang
Copy link
Contributor

@leslizhang leslizhang commented Mar 13, 2024

What changes were proposed in this pull request?

In the implementation of the methods flushBuffer, handleEventAndUpdateMetrics, and removeBufferByShuffleId, read-write locks have been added to manage concurrency. This ensures that a ShuffleBuffer successfully converted into a flushEvent won't be cleaned up again by removeBufferByShuffleId, and a ShuffleBuffer already cleaned up by removeBufferByShuffleId won't be transformed back into a flushEvent. This effectively resolves the concurrency issue.

Why are the changes needed?

Fix #1571 & #1560 & #1542

The key logic of the PR is as follows:

Before this PR:

  1. A ShuffleBuffer is turned into a FlushEvent, and its blocks and size are cleared
  2. The FlushEvent is added to the flushing queue
  3. The method removeBufferByShuffleId is executed, which causes the following things to happen:

3.1. Running the following code snippet, but please note that in the code below, buffer.getBlocks() is empty and size is 0, because of the step 1 above:

for (ShuffleBuffer buffer : buffers) {
  buffer.getBlocks().forEach(spb -> spb.getData().release());
  ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
  size += buffer.getSize();
}

3.2. appId is removed from the bufferPool

4. The FlushEvent is taken out from the queue and encounters an EventInvalidException because the appId was removed before

5. When handling the EventInvalidException, nothing is done and the event.doCleanup() method is not called, causing a memory leak.
Of course, this is just one scenario of concurrency exceptions. In the previous code, without locking, in the processFlushEvent method, it is possible that the event may become invalid at any time when continuing executing in processFlushEvent method, which is why there is #1542. Also, there is #1560.


After this PR:
We will set a read lock for steps 1 and 2 above, a write lock for step 3, a read lock for step 4, and when encountering an EventInvalidException in step 5, we will call the event.doCleanup() method to release the memory.

In this way, we can ensure the following things when resources are being cleaned up:

  1. ShuffleBuffers that have not yet been converted to FlushEvents will not be converted in the future, but will be directly cleaned up.
  2. FlushEvents that have been converted from ShuffleBuffers will definitely encounter an EventInvalidException, and we will eventually handle this exception correctly, releasing memory.
  3. If there is already a FlushEvent being processed and it is about to be flushed to disk, the resource cleanup task will wait for all FlushEvents related to the appId to be completed before starting the cleanup task, ensuring that the cleanup and flushing tasks are completely independent and do not interfere with each other.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing UTs.

@rickyma
Copy link
Contributor

rickyma commented Mar 13, 2024

@jerqi @zuston PTAL.

@github-actions
Copy link

github-actions bot commented Mar 13, 2024

Test Results

 2 340 files  ±0   2 340 suites  ±0   4h 32m 2s ⏱️ -26s
   908 tests ±0     907 ✅ ±0   1 💤 ±0  0 ❌ ±0 
10 541 runs  ±0  10 527 ✅ ±0  14 💤 ±0  0 ❌ ±0 

Results for commit 1786106. ± Comparison against base commit c3c0c37.

♻️ This comment has been updated with latest results.

@zuston
Copy link
Member

zuston commented Mar 14, 2024

I want to know that if the flush event has been pushed into handler queue before the app is expired, what will happen when this event is handling but app has been expired?

Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a quick look.

It seems that you don't add a new test that demonstrate the memory leak issue. Would you mind to add a test cases that indeed fixes the concurrency issues.

taskInfo.getCommitLocks().remove(shuffleId);
LOG.info("Start remove resource for appId[{}], shuffleIds[{}]", appId, shuffleIds);
final long start = System.currentTimeMillis();
final ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to limit the lock scope here.

writeLock.lock() should be started here rather than from the beginning of the method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to limit the lock scope here.

writeLock.lock() should be started here rather than from the beginning of the method.

Cleaning up resources is not a frequent operation. It only happens when an appId expires and is removed, or when unregistering a shuffleId. Therefore, I don't think there's a need to narrow the scope of the lock, as it won't cause any performance bottlenecks. And the code will be more readable.
Also, we have conducted performance stress tests on this PR and it does not impact performance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Therefore, I don't think there's a need to narrow the scope of the lock, as it won't cause any performance bottlenecks.

I don't think the rational for not narrowing down the lock scope is that it won't cause any performance issues. Narrowing down the lock scope usually helps reduces potential deadlock issues, for example, when the lock scope is long enough, it's more possible that another operation(with locking) is added by later developers without knowing it would cause dead locks. Therefore, it's considered best practice to always narrow down lock scope.

For this particular case though, other methods already have long lock scope, which should be addressed later. I'm ok to leave it as it is for now.

event.markOwnedByHugePartition();

ReentrantReadWriteLock.ReadLock readLock = shuffleTaskManager.getAppReadLock(appId);
readLock.lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto..

endPartition,
shuffleFlushManager.getDataDistributionType(appId));
if (event != null) {
event.setReadLock(readLock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem idiomatic.
A more appropriate way would be that passing the read lock to the constructor of ShuffleDataFlushEvent and the readLock shall never be changed after construction.

The lock logic could be hide behind the ShuffleDataFlushEvent internal processing logic or exposed by:

Class ShuffleDataFlushEvent {
    <T> T withReadLock(Supplier<T> action) {
    readLock.lock();
    try {
      return action.get();
    } finally {
      readLock.unlock();
    }
  }
}

Copy link
Contributor

@rickyma rickyma Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code has been refactored, no need to do this anymore.

@leslizhang leslizhang force-pushed the issue-1572 branch 4 times, most recently from 41e12ae to dbaeb06 Compare March 15, 2024 06:24
@codecov-commenter
Copy link

codecov-commenter commented Mar 15, 2024

Codecov Report

Attention: Patch coverage is 83.33333% with 10 lines in your changes are missing coverage. Please review.

Project coverage is 54.95%. Comparing base (9737d57) to head (6385268).
Report is 5 commits behind head on master.

Files Patch % Lines
.../org/apache/uniffle/server/ShuffleTaskManager.java 81.81% 4 Missing and 2 partials ⚠️
...he/uniffle/server/buffer/ShuffleBufferManager.java 78.94% 3 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1574      +/-   ##
============================================
+ Coverage     53.96%   54.95%   +0.98%     
- Complexity     2858     2861       +3     
============================================
  Files           438      418      -20     
  Lines         24793    22480    -2313     
  Branches       2109     2113       +4     
============================================
- Hits          13379    12353    -1026     
+ Misses        10576     9356    -1220     
+ Partials        838      771      -67     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@rickyma
Copy link
Contributor

rickyma commented Mar 15, 2024

I want to know that if the flush event has been pushed into handler queue before the app is expired, what will happen when this event is handling but app has been expired?

To make it clear for everyone, I will try to explain this issue in detail.

Before this PR:

  1. A ShuffleBuffer is turned into a FlushEvent, and its blocks and size are cleared
  2. The FlushEvent is added to the flushing queue
  3. The method removeBufferByShuffleId is executed, which causes the following things to happen:

3.1. Running the following code snippet, but please note that in the code below, buffer.getBlocks() is empty and size is 0, because of the step 1 above:

for (ShuffleBuffer buffer : buffers) {
  buffer.getBlocks().forEach(spb -> spb.getData().release());
  ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
  size += buffer.getSize();
}

3.2. appId is removed from the bufferPool

4. The FlushEvent is taken out from the queue and encounters an EventInvalidException because the appId was removed before

5. When handling the EventInvalidException, nothing is done and the event.doCleanup() method is not called, causing a memory leak.
Of course, this is just one scenario of concurrency exceptions. In the previous code, without locking, in the processFlushEvent method, it is possible that the event may become invalid at any time when continuing executing in processFlushEvent method, which is why there is #1542. Also, there is #1560.


After this PR:
We will set a read lock for steps 1 and 2 above, a write lock for step 3, a read lock for step 4, and when encountering an EventInvalidException in step 5, we will call the event.doCleanup() method to release the memory.

In this way, we can ensure the following things when resources are being cleaned up:

  1. ShuffleBuffers that have not yet been converted to FlushEvents will not be converted in the future, but will be directly cleaned up.
  2. FlushEvents that have been converted from ShuffleBuffers will definitely encounter an EventInvalidException, and we will eventually handle this exception correctly, releasing memory.
  3. If there is already a FlushEvent being processed and it is about to be flushed to disk, the resource cleanup task will wait for all FlushEvents related to the appId to be completed before starting the cleanup task, ensuring that the cleanup and flushing tasks are completely independent and do not interfere with each other.

@rickyma
Copy link
Contributor

rickyma commented Mar 15, 2024

@advancedxy PTAL again, the code is updated. I don't think we need to add new UTs. Because in this PR, we have already added locks into existing UTs. If anything goes wrong, we will know it.

@rickyma
Copy link
Contributor

rickyma commented Mar 15, 2024

@jerqi @zuston Please rebuild this PR, thanks.

@rickyma
Copy link
Contributor

rickyma commented Mar 15, 2024

This PR should be merged ASAP, because this issue is a deeply hidden and serious problem.

@jerqi
Copy link
Contributor

jerqi commented Mar 16, 2024

I want to know that if the flush event has been pushed into handler queue before the app is expired, what will happen when this event is handling but app has been expired?

To make it clear for everyone, I will try to explain this issue in detail.

Before this PR:

  1. A ShuffleBuffer is turned into a FlushEvent, and its blocks and size are cleared
  2. The FlushEvent is added to the flushing queue
  3. The method removeBufferByShuffleId is executed, which causes the following things to happen:

3.1. Running the following code snippet, but please note that in the code below, buffer.getBlocks() is empty and size is 0, because of the step 1 above:

for (ShuffleBuffer buffer : buffers) {
  buffer.getBlocks().forEach(spb -> spb.getData().release());
  ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
  size += buffer.getSize();
}

3.2. appId is removed from the bufferPool → 4. The FlushEvent is taken out from the queue and encounters an EventInvalidException because the appId was removed before → 5. When handling the EventInvalidException, nothing is done and the event.doCleanup() method is not called, causing a memory leak. Of course, this is just one scenario of concurrency exceptions. In the previous code, without locking, in the processFlushEvent method, it is possible that the event may become invalid at any time when continuing executing in processFlushEvent method, which is why there is #1542. Also, there is #1560.

After this PR: We will set a read lock for steps 1 and 2 above, a write lock for step 3, a read lock for step 4, and when encountering an EventInvalidException in step 5, we will call the event.doCleanup() method to release the memory.

In this way, we can ensure the following things when resources are being cleaned up:

  1. ShuffleBuffers that have not yet been converted to FlushEvents will not be converted in the future, but will be directly cleaned up.
  2. FlushEvents that have been converted from ShuffleBuffers will definitely encounter an EventInvalidException, and we will eventually handle this exception correctly, releasing memory.
  3. If there is already a FlushEvent being processed and it is about to be flushed to disk, the resource cleanup task will wait for all FlushEvents related to the appId to be completed before starting the cleanup task, ensuring that the cleanup and flushing tasks are completely independent and do not interfere with each other.

@leslizhang Could you update this to the description of pull request? Could you add a ut for this case?

@leslizhang
Copy link
Contributor Author

leslizhang commented Mar 16, 2024

I want to know that if the flush event has been pushed into handler queue before the app is expired, what will happen when this event is handling but app has been expired?

To make it clear for everyone, I will try to explain this issue in detail.
Before this PR:

  1. A ShuffleBuffer is turned into a FlushEvent, and its blocks and size are cleared
  2. The FlushEvent is added to the flushing queue
  3. The method removeBufferByShuffleId is executed, which causes the following things to happen:

3.1. Running the following code snippet, but please note that in the code below, buffer.getBlocks() is empty and size is 0, because of the step 1 above:

for (ShuffleBuffer buffer : buffers) {
  buffer.getBlocks().forEach(spb -> spb.getData().release());
  ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
  size += buffer.getSize();
}

3.2. appId is removed from the bufferPool → 4. The FlushEvent is taken out from the queue and encounters an EventInvalidException because the appId was removed before → 5. When handling the EventInvalidException, nothing is done and the event.doCleanup() method is not called, causing a memory leak. Of course, this is just one scenario of concurrency exceptions. In the previous code, without locking, in the processFlushEvent method, it is possible that the event may become invalid at any time when continuing executing in processFlushEvent method, which is why there is #1542. Also, there is #1560.
After this PR: We will set a read lock for steps 1 and 2 above, a write lock for step 3, a read lock for step 4, and when encountering an EventInvalidException in step 5, we will call the event.doCleanup() method to release the memory.
In this way, we can ensure the following things when resources are being cleaned up:

  1. ShuffleBuffers that have not yet been converted to FlushEvents will not be converted in the future, but will be directly cleaned up.
  2. FlushEvents that have been converted from ShuffleBuffers will definitely encounter an EventInvalidException, and we will eventually handle this exception correctly, releasing memory.
  3. If there is already a FlushEvent being processed and it is about to be flushed to disk, the resource cleanup task will wait for all FlushEvents related to the appId to be completed before starting the cleanup task, ensuring that the cleanup and flushing tasks are completely independent and do not interfere with each other.

@leslizhang Could you update this to the description of pull request? Could you add a ut for this case?

Thank you for the suggestion.

Could you update this to the description of pull request?

done

Could you add a ut for this case?

A bit more suggestion:
The essence of this PR is to add lock control to the read and write operations of blocks in the shuffle server memory: tasks to delete blocks add write locks (considered as write operations on blocks), and tasks to flush blocks (including the conversion of blocks to events, and the flushing operation of generated events) add read locks. This PR does not change the processing flow of block lifecycle. For UTs, the logic of read-write lock synchronization has been added to all previous unit tests(ShuffleBufferManagerTest,ShuffleFlushManagerTest, etc.) for caching block or flushing block. That is to say, the unit testing of the PR is implemented by adapting and modifying the previous unit test code.
thx again.@jerqi

@jerqi
Copy link
Contributor

jerqi commented Mar 16, 2024

OK for me. Let others take another look.

@rickyma
Copy link
Contributor

rickyma commented Mar 18, 2024

ping @zuston @advancedxy

@advancedxy
Copy link
Contributor

I don't think we need to add new UTs. Because in this PR, we have already added locks into existing UTs. If anything goes wrong, we will know it.

For UTs, the logic of read-write lock synchronization has been added to all previous unit tests(ShuffleBufferManagerTest,ShuffleFlushManagerTest, etc.) for caching block or flushing block. That is to say, the unit testing of the PR is implemented by adapting and modifying the previous unit test code.

Hmmm, I think the new unit test cases means that is it possible to adding new UTs to demonstrate the issues you described in your PR? Since concurrency issues are happened in production env, they indicates that the existing UTs doesn't cover the concurrency issues.

If it's too hard or too complicated to add cases like that, you can simply state that and I think we are fine to merge it for now. Otherwise, the existing test cases cannot prove you have indeed fixed the concurrency issues.

Copy link
Member

@zuston zuston left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall. I also hope this concurrency problem could be included in test cases. I know the lock ensures the remove + purge events sequences, but we'd better to add tests to make sure avoid changing the logic in the future

private final long size;
private final List<ShufflePartitionedBlock> shuffleBlocks;
private final Supplier<Boolean> valid;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@leslizhang
Copy link
Contributor Author

I don't think we need to add new UTs. Because in this PR, we have already added locks into existing UTs. If anything goes wrong, we will know it.

For UTs, the logic of read-write lock synchronization has been added to all previous unit tests(ShuffleBufferManagerTest,ShuffleFlushManagerTest, etc.) for caching block or flushing block. That is to say, the unit testing of the PR is implemented by adapting and modifying the previous unit test code.

Hmmm, I think the new unit test cases means that is it possible to adding new UTs to demonstrate the issues you described in your PR? Since concurrency issues are happened in production env, they indicates that the existing UTs doesn't cover the concurrency issues.

If it's too hard or too complicated to add cases like that, you can simply state that and I think we are fine to merge it for now. Otherwise, the existing test cases cannot prove you have indeed fixed the concurrency issues.

This is not a consistently reproducible issue, so it is difficult to conduct positive and negative tests. We have already run more than 10 20TB TPCH SQL tests in our stress test environment without encountering any problems. Before fixing this issue, the frequency of its occurrence was relatively high. @advancedxy

@rickyma
Copy link
Contributor

rickyma commented Mar 18, 2024

@zuston @advancedxy I think we can merge this for now.

@zuston
Copy link
Member

zuston commented Mar 18, 2024

This is not a consistently reproducible issue, so it is difficult to conduct positive and negative tests. We have already run more than 10 20TB TPCH SQL tests in our stress test environment without encountering any problems. Before fixing this issue, the frequency of its occurrence was relatively high. @advancedxy

Got your point, but unit test could mock this problem.

@zuston
Copy link
Member

zuston commented Mar 18, 2024

@zuston @advancedxy I think we can merge this for now.

If you insist, I will merge this before completing the unit tests. But I hope the test cases could be added later in another PR.

Anyway, current fix is LGTM. Merged/

@zuston zuston merged commit d851b2e into apache:master Mar 18, 2024
zuston pushed a commit to zuston/incubator-uniffle that referenced this pull request May 27, 2024
…n` occurs (apache#1574)

In the implementation of the methods `flushBuffer`, `handleEventAndUpdateMetrics`, and `removeBufferByShuffleId`, read-write locks have been added to manage concurrency. This ensures that a `ShuffleBuffer` successfully converted into a `flushEvent` won't be cleaned up again by `removeBufferByShuffleId`, and a `ShuffleBuffer` already cleaned up by `removeBufferByShuffleId` won't be transformed back into a `flushEvent`. This effectively resolves the concurrency issue.

Fix apache#1571 & apache#1560 & apache#1542

The key logic of the PR is as follows:

Before this PR:
1. A `ShuffleBuffer` is turned into a `FlushEvent`, and **_its blocks and size are cleared_**
→
2. The `FlushEvent` is added to the flushing queue
→
3. The method `removeBufferByShuffleId` is executed, which causes the following things to happen:

3.1. Running the following code snippet, but please note that in the code below, `buffer.getBlocks()` **_is empty and size is 0_**, because of the step 1 above:
```
for (ShuffleBuffer buffer : buffers) {
  buffer.getBlocks().forEach(spb -> spb.getData().release());
  ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
  size += buffer.getSize();
}
```

3.2. `appId` is removed from the `bufferPool`
→
4. The `FlushEvent` is taken out from the queue and encounters an `EventInvalidException` because the `appId` was removed before
→
5. When handling the `EventInvalidException`, nothing is done and the `event.doCleanup()` method **_is not called, causing a memory leak_**.
Of course, this is just one scenario of concurrency exceptions. In the previous code, without locking, in the `processFlushEvent` method, it is possible that the event may become invalid at any time when continuing executing in `processFlushEvent` method, which is why there is apache#1542. Also, there is apache#1560.

---

After this PR:
We will set a read lock for steps 1 and 2 above, a write lock for step 3, a read lock for step 4, and when encountering an `EventInvalidException` in step 5, we will call the `event.doCleanup()` method to release the memory.

In this way, we can ensure the following things when resources are being cleaned up:
1. `ShuffleBuffers` that have not yet been converted to `FlushEvents` will not be converted in the future, but will be directly cleaned up.
2. `FlushEvents` that have been converted from `ShuffleBuffers` will definitely encounter an `EventInvalidException`, and we will eventually handle this exception correctly, releasing memory.
3. If there is already a `FlushEvent` being processed and it is about to be flushed to disk, the resource cleanup task will wait for all `FlushEvents` related to the `appId` to be completed before starting the cleanup task, ensuring that the cleanup and flushing tasks are completely independent and do not interfere with each other.

No.

Existing UTs.
---------

Co-authored-by: leslizhang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Memory may leak when EventInvalidException occurs

6 participants