Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,26 @@ public void init(
this.latestOffset = latestOffset;

assert !segmentNode.sentinel();
KafkaCacheSegment newSegment = null;
while (newSegment == null)
if (segmentNode.segment() != null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the assert above is true, such that segmentNode is not the sentinel, then is it still possible for segmentNode.segment() != null to be false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a possible case, as before we call cursor.init() method, mentioned check is done. Thus making segmentNode.segment() == null possible even after asserting !segmentNode.sentinel()

if (segmentNode.sentinel())
{
    segmentNode = segmentNode.next();
}

{
newSegment = segmentNode.segment().acquire();
if (newSegment == null)
KafkaCacheSegment newSegment = null;
while (newSegment == null)
{
segmentNode = segmentNode.next();
newSegment = segmentNode.segment().acquire();
if (newSegment == null)
{
segmentNode = segmentNode.next();
}
}
}
this.segmentNode = segmentNode;
this.segment = newSegment;
this.segmentNode = segmentNode;
this.segment = newSegment;

assert this.segmentNode != null;
assert this.segment != null;
assert this.segmentNode != null;
assert this.segment != null;

final int position = condition.reset(segment, offset, latestOffset, POSITION_UNSET);
this.position = position == RETRY_SEGMENT_VALUE || position == NEXT_SEGMENT_VALUE ? 0 : position;
final int position = condition.reset(segment, offset, latestOffset, POSITION_UNSET);
this.position = position == RETRY_SEGMENT_VALUE || position == NEXT_SEGMENT_VALUE ? 0 : position;
}
}

public KafkaCacheEntryFW next(
Expand All @@ -151,7 +154,7 @@ public KafkaCacheEntryFW next(
KafkaCacheEntryFW nextEntry = null;

next:
while (nextEntry == null)
while (segmentNode != null && nextEntry == null)
{
final int positionNext = condition.next(position);
if (positionNext == RETRY_SEGMENT_VALUE)
Expand Down Expand Up @@ -345,30 +348,33 @@ public void advance(
assert segmentNode != null;
assert segment != null;

KafkaCacheSegment newSegment = segmentNode.segment();
if (segment != newSegment)
if (segmentNode != null)
{
segment.release();

Node newSegmentNode = segmentNode;
newSegment = newSegment.acquire();
while (newSegment == null)
KafkaCacheSegment newSegment = segmentNode.segment();
if (segment != newSegment)
{
newSegment = newSegmentNode.segment().acquire();
if (newSegment == null)
segment.release();

Node newSegmentNode = segmentNode;
newSegment = newSegment.acquire();
while (newSegment == null)
{
newSegmentNode = newSegmentNode.next();
newSegment = newSegmentNode.segment().acquire();
if (newSegment == null)
{
newSegmentNode = newSegmentNode.next();
}
}
}
this.segmentNode = newSegmentNode;
this.segment = newSegment;
this.segmentNode = newSegmentNode;
this.segment = newSegment;

assert segmentNode != null;
assert !segmentNode.sentinel();
assert segment != null;
assert segmentNode != null;
assert !segmentNode.sentinel();
assert segment != null;

final int position = condition.reset(segment, offset, latestOffset, POSITION_UNSET);
this.position = position == RETRY_SEGMENT_VALUE || position == NEXT_SEGMENT_VALUE ? 0 : position;
final int position = condition.reset(segment, offset, latestOffset, POSITION_UNSET);
this.position = position == RETRY_SEGMENT_VALUE || position == NEXT_SEGMENT_VALUE ? 0 : position;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,14 +866,13 @@ private void onServerFanoutReplyData(

final long retainAt = partition.retainAt(nextHead.segment());
this.retainId = doServerFanoutInitialSignalAt(retainAt, traceId, SIGNAL_SEGMENT_RETAIN);
}

if (deleteId == NO_CANCEL_ID &&
partition.cleanupPolicy().delete() &&
!nextHead.previous().sentinel())
{
final long deleteAt = partition.deleteAt(nextHead.previous().segment(), retentionMillisMax);
this.deleteId = doServerFanoutInitialSignalAt(deleteAt, traceId, SIGNAL_SEGMENT_DELETE);
}
if (deleteId == NO_CANCEL_ID &&
partition.cleanupPolicy().delete())
{
final long deleteAt = partition.deleteAt(nextHead.segment(), retentionMillisMax);
this.deleteId = doServerFanoutInitialSignalAt(deleteAt, traceId, SIGNAL_SEGMENT_DELETE);
}

final int entryFlags = (flags & FLAGS_SKIP) != 0x00 ? CACHE_ENTRY_FLAGS_ABORTED : 0x00;
Expand Down Expand Up @@ -1187,15 +1186,16 @@ private void onServerFanoutInitialSignalSegmentDelete(
final long now = currentTimeMillis();

Node segmentNode = partition.sentinel().next();
while (segmentNode != partition.head() &&
partition.deleteAt(segmentNode.segment(), retentionMillisMax) <= now)
while (!segmentNode.sentinel() &&
partition.deleteAt(segmentNode.segment(), retentionMillisMax) <= now)
{
segmentNode.remove();
segmentNode = segmentNode.next();
}

assert segmentNode != null;

if (segmentNode != partition.head())
if (segmentNode != partition.sentinel())
{
final long deleteAt = partition.deleteAt(segmentNode.segment(), retentionMillisMax);
this.deleteId = doServerFanoutInitialSignalAt(deleteAt, traceId, SIGNAL_SEGMENT_DELETE);
Expand Down