Skip to content

Conversation

@zuston
Copy link
Member

@zuston zuston commented Apr 17, 2024

What changes were proposed in this pull request?

  1. make the write client always use the latest available assignment for the following writing when the block reassign happens.
  2. support multi time retry for partition reassign
  3. limit the max reassign server num of one partition
  4. refactor the reassign rpc
  5. rename the faultyServer -> receivingFailureServer.

Reassign whole process

image

Always using the latest assignment

To acheive always using the latest assignment, I introduce the TaskAttemptAssignment to get the latest assignment for current task. The creating process of AddBlockEvent also will apply the latest assignment by TaskAttemptAssignment

And it will be updated by the reassignOnBlockSendFailure rpc.
That means the original reassign rpc response will be refactored and replaced by the whole latest shuffleHandleInfo.

Why are the changes needed?

This PR is the subtask for #1608.

Leverging the #1615 / #1610 / #1609, we have implemented the reassign servers mechansim when write client encounters the server failure or unhealthy. But this is not good enough that will not share the faulty server state to the unstarted tasks and latter AddBlockEvent .

Does this PR introduce any user-facing change?

Yes.

How was this patch tested?

Unit and integration tests.

Integration tests as follows:

  1. PartitionBlockDataReassignBasicTest to validate the reassign mechanism valid
  2. PartitionBlockDataReassignMultiTimesTest is to test the partition reassign mechanism of multiple retries.

@zuston zuston requested review from EnricoMi and jerqi April 17, 2024 02:31
@github-actions
Copy link

github-actions bot commented Apr 17, 2024

Test Results

 2 391 files  + 42   2 391 suites  +42   4h 41m 32s ⏱️ + 29m 39s
   925 tests +  6     924 ✅ +  7   1 💤 ±0  0 ❌ ±0 
10 712 runs  +114  10 698 ✅ +116  14 💤 ±0  0 ❌ ±0 

Results for commit f44f6a4. ± Comparison against base commit 60fce8e.

This pull request removes 5 and adds 11 tests. Note that renamed tests count towards both.
org.apache.spark.shuffle.ShuffleHandleInfoTest ‑ testCreatePartitionReplicaTracking
org.apache.spark.shuffle.ShuffleHandleInfoTest ‑ testListAllPartitionAssignmentServers
org.apache.spark.shuffle.ShuffleHandleInfoTest ‑ testReassignment
org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactoryTest ‑ testShuffleManagerServerType
org.apache.uniffle.test.PartitionBlockDataReassignTest ‑ resultCompareTest
org.apache.hadoop.mapred.SortWriteBufferTest ‑ testSortBufferIterator
org.apache.spark.shuffle.handle.MutableShuffleHandleInfoTest ‑ testCreatePartitionReplicaTracking
org.apache.spark.shuffle.handle.MutableShuffleHandleInfoTest ‑ testListAllPartitionAssignmentServers
org.apache.spark.shuffle.handle.MutableShuffleHandleInfoTest ‑ testUpdateAssignment
org.apache.spark.shuffle.writer.RssShuffleWriterTest ‑ reassignMultiTimesForOnePartitionIdTest
org.apache.spark.shuffle.writer.RssShuffleWriterTest ‑ refreshAssignmentTest
org.apache.uniffle.server.buffer.ShuffleBufferManagerTest ‑ blockSizeMetricsTest
org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactoryTest ‑ testShuffleManagerServerType{ServerType}[1]
org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactoryTest ‑ testShuffleManagerServerType{ServerType}[2]
org.apache.uniffle.test.PartitionBlockDataReassignBasicTest ‑ resultCompareTest
…

♻️ This comment has been updated with latest results.

import org.apache.uniffle.common.exception.RssException;

/** This class is to wrap the shuffleHandleInfo to speed up the partitionAssignment getting. */
public class ShuffleHandleInfoWrapper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not adding this caching to ShuffleHandleInfo? With ShuffleHandleInfo.updateAssignmentPlan fetching and storing latestAssignment and ShuffleHandleInfo.getPartitionAssignment(taskAttemptId) providing the assignment from that Map.

Copy link
Member Author

@zuston zuston Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShuffleHandleInfo is always maintain the latest version in spark driver shuffleManager. But for shuffle writer, the holding shuffleHandleInfo is partially latest, which is updated by the grpc handle.

If using your way, it maybe cause some questions. Because the same object has different usage. It looks not clear.

Feel free to discuss more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are saying you do not want the ShuffleHandleInfo hold by the shuffle writer to change?

Copy link
Member Author

@zuston zuston Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not accurate, I hope the updated latest cache is not maintained into the shuffleHandleInfo. Because it is used by the shuffle writer, but the shuffleHandleInfo is transferred by the grpc to writer(updated by the reassign rpc everytime). So this cache is not good for the shareable for all the tasks. maybe using a independent handle wrapper to hold cache is more clear.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling ShuffleHandleInfoWrapper(taskAttemptId, shuffleHandleInfo).retrievePartitionAssignment(partitionId) could be replaced with shuffleHandleInfo.getLatestAssignmentPlan(taskAttemptId).get(partitionId). To avoid fetching the whole plan for each partition, this wrapper caches it. Looks like this is the only purpose of this class, as indicated by the comment above

/** This class is to wrap the shuffleHandleInfo to speed up the partitionAssignment getting. */

Why can't ShuffleHandleInfo cache the partition assignment? Do you need ShuffleHandleInfo to be immutable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't ShuffleHandleInfo cache the partition assignment?

Yes. the cache is also OK, let me think twice

Do you need ShuffleHandleInfo to be immutable?

Needn't. The shuffleHandleInfo will be updated when the reassign occurs. But this will happen in driver side and then send back the latest handleInfo to executor task side.

…ter/ShuffleHandleInfoWrapper.java

Co-authored-by: Enrico Minack <[email protected]>
@jerqi
Copy link
Contributor

jerqi commented Apr 29, 2024

If one server becomes a faulty server, all tasks will change the assignment, won't they? Why do we need to record every task for a new assignment?

@zuston
Copy link
Member Author

zuston commented Apr 29, 2024

If one server becomes a faulty server, all tasks will change the assignment, won't they? Why do we need to record every task for a new assignment?

I want to clarity that receivingFailureServer should be scoped for partition block data rather than tasks. Because sometimes server will in high watermark with too much requests, so they will effect these partitioned data in that time. That means these partitioned data should be reassigned to another server. If this is not happened in other partitions, the assign will not be changed.

Why do we need to record every task for a new assignment?

I don't catch your thought about task -> assignment.

@jerqi
Copy link
Contributor

jerqi commented Apr 29, 2024

If one server becomes a faulty server, all tasks will change the assignment, won't they? Why do we need to record every task for a new assignment?

I want to clarity that receivingFailureServer should be scoped for partition block data rather than tasks. Because sometimes server will in high watermark with too much requests, so they will effect these partitioned data in that time. That means these partitioned data should be reassigned to another server. If this is not happened in other partitions, the assign will not be changed.

Why do we need to record every task for a new assignment?

I don't catch your thought about task -> assignment.

I got your point. You just record one reassignment but you re-balance them if you according to hash or range. It's ok that we store one assignment. But we should consider two class names.

receivingFailureServer

Could we return a high load error code to the server when the shuffle server has too high load? Is it a failure when we just return a high load error code?

TaskAssignment

Maybe we don't need to change this class name. Should we have a strategy class to handle the difference between faulty servers and high load servers.

@zuston
Copy link
Member Author

zuston commented Apr 29, 2024

It looks there was agreement on the regular partition reassignment, that's good.

Let's extending the topic to huge partition or high-load server that you defined.

Could we return a high load error code to the server when the shuffle server has too high load? Is it a failure when we just return a high load error code?

Yes. this is OK. Actually this could be implemented in shuffle-server side. And All need things for this high-load reassignment have been supported in client side.

Maybe we don't need to change this class name. Should we have a strategy class to handle the difference between faulty servers and high load servers.

Yes. this also could be implemented in TaskAssignment side, actually this has been supported in the pervious implementation but for better understand, I removed this part.

Anyway, I only do the regular reassignment here and ensure the expansibility for future development

@jerqi
Copy link
Contributor

jerqi commented Apr 29, 2024

It looks there was agreement on the regular partition reassignment, that's good.

Let's extending the topic to huge partition or high-load server that you defined.

Could we return a high load error code to the server when the shuffle server has too high load? Is it a failure when we just return a high load error code?

Yes. this is OK. Actually this could be implemented in shuffle-server side. And All need things for this high-load reassignment have been supported in client side.

Maybe we don't need to change this class name. Should we have a strategy class to handle the difference between faulty servers and high load servers.

Yes. this also could be implemented in TaskAssignment side, actually this has been supported in the pervious implementation but for better understand, I removed this part.

Anyway, I only do the regular reassignment here and ensure the expansibility for future development

  1. I just feel that receivingFailureServer isn't a good name.

  2. Could you extract a strategy class for the fault tolerance?

@zuston
Copy link
Member Author

zuston commented Apr 29, 2024

Could you extract a strategy class for the fault tolerance?

This could be as a pluggable strategy if need in the future.

@jerqi
Copy link
Contributor

jerqi commented Apr 30, 2024

Could you extract a strategy class for the fault tolerance?

This could be as a pluggable strategy if need in the future.

I want to strategy class to help us understand the class the TaskAttemptAssignment.

@zuston
Copy link
Member Author

zuston commented Apr 30, 2024

Could you extract a strategy class for the fault tolerance?

This could be as a pluggable strategy if need in the future.

I want to strategy class to help us understand the class the TaskAttemptAssignment.

Could you help directly review this class to leave the and comments and suggestions ?

jerqi
jerqi previously approved these changes Apr 30, 2024
Copy link
Contributor

@jerqi jerqi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, Do you need to update your description of this pull request?

@zuston
Copy link
Member Author

zuston commented Apr 30, 2024

LGTM, Do you need to update your description of this pull request?

Updated.

Please take a look @EnricoMi If you have no problem for this, I will merge this.


import org.apache.uniffle.common.RemoteStorageInfo;

public abstract class ShuffleHandleInfoBase implements ShuffleHandleInfo, Serializable {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShuffleHandleInfoBase -> BaseShuffleHandleInfo?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope the prefix of ShuffleHandleInfo could be placed in a near group,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Base class is a more common name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like BaseShuffleHandleInfo, but a class name like {Interface}Base is common practice for a base implementation of an interface {Interface}.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's reserve this name.

import org.apache.uniffle.proto.RssProtos;

/** This class holds the dynamic partition assignment for partition reassign mechanism. */
public class MutableShuffleHandleInfo extends ShuffleHandleInfoBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A class should not be named after its usage, but after what it implements, as it has no knowledge about how it is being used, and it does not care, it provides only what it implements.

This handle info is used for fault tolerance, but there is no fault tolerance built into this class. It uses per-replica server infos to implement getAvailablePartitionServersForWriter() and getAllPartitionServersForReader().


import org.apache.uniffle.common.RemoteStorageInfo;

public abstract class ShuffleHandleInfoBase implements ShuffleHandleInfo, Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like BaseShuffleHandleInfo, but a class name like {Interface}Base is common practice for a base implementation of an interface {Interface}.

private boolean mutable = false;

public TaskAttemptAssignment(long taskAttemptId, ShuffleHandleInfo shuffleHandleInfo) {
this.assignment = shuffleHandleInfo.getAvailablePartitionServersForWriter();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after removing mutable completely (see below), this should be changed to:

Suggested change
this.assignment = shuffleHandleInfo.getAvailablePartitionServersForWriter();
this.update(shuffleHandleInfo);

@zuston
Copy link
Member Author

zuston commented May 6, 2024

All done. PTAL again @jerqi @EnricoMi

@zuston
Copy link
Member Author

zuston commented May 7, 2024

Gentle ping @EnricoMi

@zuston zuston merged commit 30bf8dc into apache:master May 9, 2024
@zuston
Copy link
Member Author

zuston commented May 9, 2024

Merged. Thanks for your review @qqqttt123 @jerqi @dingshun3016 @xumanbu @EnricoMi

Feel free to discuss more if you have any suggestion @EnricoMi .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants