-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Fix MultiWriterIdGenerator.current_position
.
#8257
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix non-user visible bug in implementation of `MultiWriterIdGenerator.get_current_token_for_writer`. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -219,6 +219,10 @@ def __init__( | |
# should be less than the minimum of this set (if not empty). | ||
self._unfinished_ids = set() # type: Set[int] | ||
|
||
# Set of local IDs that we've processed that are larger than the current | ||
# position, due to there being smaller unpersisted IDs. | ||
self._finished_ids = set() # type: Set[int] | ||
|
||
# We track the max position where we know everything before has been | ||
# persisted. This is done by a) looking at the min across all instances | ||
# and b) noting that if we have seen a run of persisted positions | ||
|
@@ -352,12 +356,39 @@ def _mark_id_as_finished(self, next_id: int): | |
|
||
with self._lock: | ||
self._unfinished_ids.discard(next_id) | ||
self._finished_ids.add(next_id) | ||
|
||
new_cur = None | ||
|
||
if self._unfinished_ids: | ||
# If there are unfinished IDs then the new position will be the | ||
# largest finished ID less than the minimum unfinished ID. | ||
Comment on lines
+364
to
+365
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm unsure if it is clearer, but I think something like: new_cur = min_finished
for s in self._finished_ids:
if s < min_funished:
new_cur = max(new_cur, s)
else:
finished.add(s) But that might make the handling later on harder. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potentially, but then you iterate over the list twice (since you're doing a new_curr = min(self._finished_ids)
self._finished_ids = set(i for i in self._finished_ids if i > new_curr) or something? |
||
|
||
finished = set() | ||
|
||
min_unfinshed = min(self._unfinished_ids) | ||
for s in self._finished_ids: | ||
if s < min_unfinshed: | ||
if not new_cur or new_cur < s: | ||
|
||
new_cur = s | ||
else: | ||
finished.add(s) | ||
|
||
# We clear these out since they're now all less than the new | ||
# position. | ||
self._finished_ids = finished | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this essentially discarding all finished IDs that are less than the minimum unfinished ID? I'm guessing we can't discard in a loop (which would be clearer to me) because Python doesn't like you modifying things as you iterate them? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This all feels like it is doing a lot more work than before, but I guess both the old code and new are essentially iterating There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, basically we want to remove all items that are less than minimum unfinished ID. We could rewrite it to be a sorted list instead and then chop off the front of it, but its not obvious that that is better/quicker/easier. |
||
else: | ||
# There are no unfinished IDs so the new position is simply the | ||
# largest finished one. | ||
new_cur = max(self._finished_ids) | ||
|
||
# We clear these out since they're now all less than the new | ||
# position. | ||
self._finished_ids.clear() | ||
|
||
# Figure out if its safe to advance the position by checking there | ||
# aren't any lower allocated IDs that are yet to finish. | ||
if all(c > next_id for c in self._unfinished_ids): | ||
if new_cur: | ||
curr = self._current_positions.get(self._instance_name, 0) | ||
self._current_positions[self._instance_name] = max(curr, next_id) | ||
self._current_positions[self._instance_name] = max(curr, new_cur) | ||
|
||
self._add_persisted_position(next_id) | ||
|
||
|
@@ -425,7 +456,7 @@ def _add_persisted_position(self, new_id: int): | |
# We move the current min position up if the minimum current positions | ||
# of all instances is higher (since by definition all positions less | ||
# that that have been persisted). | ||
min_curr = min(self._current_positions.values()) | ||
min_curr = min(self._current_positions.values(), default=0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm struggling to see how this is directly related to this PR? The change looks fine though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sigh, sorry, that is a somewhat unrelated fix that broke that particular sytest. Basically, if the first time a worker see's an ID go past it is out of order, you can get here without anything having been added to current positions, causing this to explode. |
||
self._persisted_upto_position = max(min_curr, self._persisted_upto_position) | ||
|
||
# We now iterate through the seen positions, discarding those that are | ||
|
Uh oh!
There was an error while loading. Please reload this page.