-
Notifications
You must be signed in to change notification settings - Fork 2.4k
[HUDI-9438] Fix conflict handling for compaction instants for v8 tables #13347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
} | ||
|
||
@Test | ||
public void testConcurrentWritesWithInterleavingInflightCompaction() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this to the other class to consolidate the tests
// 3. Get any completed replace commit that happened since the last successful write and any pending replace commit. | ||
// We need to check for write conflicts since they may have mutated the same files that are being newly created by the current write. | ||
Predicate<HoodieInstant> compactionInstantFilter = (HoodieInstant instant) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a new strategy class and not mess up the v8 and v6.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean a new implementation of ConflictResolutionStrategy
? It would allow the user to potentially specify the class that does not map to their table version which can cause issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One way to make the code cleaner is to have private methods for handling the different versions so this method just checks table version and then delegates to these methods. Then the javadocs per method can also contain the specifics for that table version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've split this out into two internal methods which seems like the best way to ensure users automatically pick up these improvements if they upgraded to table version 8. The diff is also smaller now so it is more clear what is changing as part of this PR.
Stream<HoodieInstant> completedCommitsInstantStream = activeTimeline | ||
.getCommitsTimeline() | ||
.filterCompletedInstants() | ||
.filter(instant -> !isMoRTable || !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about introduce a new method: HoodieTimeline#getDataCommitsTime
- for mor: return [
DELTA_COMMIT
,REPLACE_COMMIT
] - for cow: return [
COMMIT
,REPLACE_COMMIT
]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a new strategy class and not mess up the v8 and v6.
we cannot add a new class, since this is a user facing config . +1 for separating code for v6 and v8 such that its easy to drop v6 later down the line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about introduce a new method:
HoodieTimeline#getDataCommitsTime
- for mor: return [
DELTA_COMMIT
,REPLACE_COMMIT
]- for cow: return [
COMMIT
,REPLACE_COMMIT
]
I think this handling is specific to conflict resolution. so good to keep it at this layer..
.getCommitsTimeline() | ||
.filterCompletedInstants() | ||
.filter(instant -> !isMoRTable || !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) | ||
.findInstantsAfter(lastSuccessfulInstant.isPresent() ? lastSuccessfulInstant.get().requestedTime() : HoodieTimeline.INIT_INSTANT_TS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets use lastSuccessfulInstant.map().orElse()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar here. the only thing thats different is the extra filter.. so we could pull the rest to a private method shared by both paths. and do the extra filter.
.findInstantsAfter(lastSuccessfulInstant.isPresent() ? lastSuccessfulInstant.get().requestedTime() : HoodieTimeline.INIT_INSTANT_TS) | ||
.getInstantsAsStream(); | ||
|
||
Stream<HoodieInstant> clusteringAndReplaceCommitInstants = activeTimeline |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets pull this into a private method.. called from both places.. ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay realize the difference filterPendingReplaceOrClusteringTimeline
vs filterPendingReplaceClusteringAndCompactionTimeline
. So okay to leave it as is or try and consolidate so the filter differences are obvious
@ParameterizedTest | ||
@ValueSource(booleans = {false, true}) | ||
public void testConcurrentWritesWithInterleavingSuccessfulCompaction(boolean preTableVersion8) throws Exception { | ||
initMetaClient(preTableVersion8, HoodieTableType.MERGE_ON_READ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess there are only two scenarios, that are different. So Parameterizing the entire test class is an overkill
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its safe for v8+ writers to not conflict with scheduled or completed compactions.
Without multi-writer, v6 writers should n't be conflicting with completed compactions (it ll only conflict with scheduled compactions)... With multi-writer, v6 writers will conflict with both scheduled and completed compactions. (the hasConflict()
just looks for file group id overlap)
Let me know if yours is different. if not, LGTM. Suggest getting another review given Danny has started already..
.findInstantsAfter(lastSuccessfulInstant.isPresent() ? lastSuccessfulInstant.get().requestedTime() : HoodieTimeline.INIT_INSTANT_TS) | ||
.getInstantsAsStream(); | ||
|
||
Stream<HoodieInstant> clusteringAndReplaceCommitInstants = activeTimeline |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay realize the difference filterPendingReplaceOrClusteringTimeline
vs filterPendingReplaceClusteringAndCompactionTimeline
. So okay to leave it as is or try and consolidate so the filter differences are obvious
@vinothchandar that is my understanding as well |
…es (apache#13347) * fix conflict handling for compaction given completion time changes * consolidate tests * split handling into two methods for ease of reading and debugging * extract common parts of the code
…es (apache#13347) * fix conflict handling for compaction given completion time changes * consolidate tests * split handling into two methods for ease of reading and debugging * extract common parts of the code
Change Logs
Impact
Risk level (write none, low medium or high below)
High, this is a core change to conflict detection.
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist