Skip to content

Commit d350f60

Browse files
KAFKA-18265: Move inflight batch and state classes from SharePartition (2/N) (#20230)
Another refactor PR to move in-flight batch and state out of SharePartition. This PR concludes the refactoring and subsequent PRs for this ticket will involve code cleanups and better lock handling. However the intent is to keep PRs small so they can be reviewed easily. Reviewers: Andrew Schofield <[email protected]>
1 parent a663ce3 commit d350f60

File tree

7 files changed

+712
-442
lines changed

7 files changed

+712
-442
lines changed

core/src/main/java/kafka/server/share/SharePartition.java

Lines changed: 47 additions & 401 deletions
Large diffs are not rendered by default.

core/src/test/java/kafka/server/share/SharePartitionTest.java

Lines changed: 2 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
package kafka.server.share;
1818

1919
import kafka.server.ReplicaManager;
20-
import kafka.server.share.SharePartition.InFlightState;
21-
import kafka.server.share.SharePartition.RecordState;
2220
import kafka.server.share.SharePartition.SharePartitionState;
2321
import kafka.server.share.SharePartitionManager.SharePartitionListener;
2422

@@ -57,6 +55,8 @@
5755
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
5856
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
5957
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
58+
import org.apache.kafka.server.share.fetch.InFlightState;
59+
import org.apache.kafka.server.share.fetch.RecordState;
6060
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
6161
import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
6262
import org.apache.kafka.server.share.persister.NoOpStatePersister;
@@ -140,45 +140,6 @@ public void tearDown() throws Exception {
140140
sharePartitionMetrics.close();
141141
}
142142

143-
@Test
144-
public void testRecordStateValidateTransition() {
145-
// Null check.
146-
assertThrows(NullPointerException.class, () -> RecordState.AVAILABLE.validateTransition(null));
147-
// Same state transition check.
148-
assertThrows(IllegalStateException.class, () -> RecordState.AVAILABLE.validateTransition(RecordState.AVAILABLE));
149-
assertThrows(IllegalStateException.class, () -> RecordState.ACQUIRED.validateTransition(RecordState.ACQUIRED));
150-
assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.ACKNOWLEDGED));
151-
assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.ARCHIVED));
152-
// Invalid state transition to any other state from Acknowledged state.
153-
assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.AVAILABLE));
154-
assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.ACQUIRED));
155-
assertThrows(IllegalStateException.class, () -> RecordState.ACKNOWLEDGED.validateTransition(RecordState.ARCHIVED));
156-
// Invalid state transition to any other state from Archived state.
157-
assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.AVAILABLE));
158-
assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.ACKNOWLEDGED));
159-
assertThrows(IllegalStateException.class, () -> RecordState.ARCHIVED.validateTransition(RecordState.ARCHIVED));
160-
// Invalid state transition to any other state from Available state other than Acquired.
161-
assertThrows(IllegalStateException.class, () -> RecordState.AVAILABLE.validateTransition(RecordState.ACKNOWLEDGED));
162-
assertThrows(IllegalStateException.class, () -> RecordState.AVAILABLE.validateTransition(RecordState.ARCHIVED));
163-
164-
// Successful transition from Available to Acquired.
165-
assertEquals(RecordState.ACQUIRED, RecordState.AVAILABLE.validateTransition(RecordState.ACQUIRED));
166-
// Successful transition from Acquired to any state.
167-
assertEquals(RecordState.AVAILABLE, RecordState.ACQUIRED.validateTransition(RecordState.AVAILABLE));
168-
assertEquals(RecordState.ACKNOWLEDGED, RecordState.ACQUIRED.validateTransition(RecordState.ACKNOWLEDGED));
169-
assertEquals(RecordState.ARCHIVED, RecordState.ACQUIRED.validateTransition(RecordState.ARCHIVED));
170-
}
171-
172-
@Test
173-
public void testRecordStateForId() {
174-
assertEquals(RecordState.AVAILABLE, RecordState.forId((byte) 0));
175-
assertEquals(RecordState.ACQUIRED, RecordState.forId((byte) 1));
176-
assertEquals(RecordState.ACKNOWLEDGED, RecordState.forId((byte) 2));
177-
assertEquals(RecordState.ARCHIVED, RecordState.forId((byte) 4));
178-
// Invalid check.
179-
assertThrows(IllegalArgumentException.class, () -> RecordState.forId((byte) 5));
180-
}
181-
182143
@Test
183144
public void testMaybeInitialize() throws InterruptedException {
184145
Persister persister = Mockito.mock(Persister.class);
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.server.share.fetch;
18+
19+
/**
20+
* The DeliveryCountOps is used to specify the behavior on the delivery count: increase, decrease,
21+
* or do nothing.
22+
*/
23+
public enum DeliveryCountOps {
24+
INCREASE, DECREASE, NO_OP
25+
}
Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.server.share.fetch;
18+
19+
import org.apache.kafka.common.utils.Time;
20+
import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
21+
import org.apache.kafka.server.util.timer.Timer;
22+
23+
import java.util.NavigableMap;
24+
import java.util.concurrent.ConcurrentSkipListMap;
25+
26+
/**
27+
* The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records.
28+
*/
29+
public class InFlightBatch {
30+
// The timer is used to schedule the acquisition lock timeout task for the batch.
31+
private final Timer timer;
32+
// The time is used to get the current time in milliseconds.
33+
private final Time time;
34+
// The offset of the first record in the batch that is fetched from the log.
35+
private final long firstOffset;
36+
// The last offset of the batch that is fetched from the log.
37+
private final long lastOffset;
38+
// The acquisition lock timeout handler is used to release the acquired records when the acquisition
39+
// lock timeout is reached.
40+
private final AcquisitionLockTimeoutHandler timeoutHandler;
41+
// The share partition metrics are used to track the metrics related to the share partition.
42+
private final SharePartitionMetrics sharePartitionMetrics;
43+
44+
// The batch state of the fetched records. If the offset state map is empty then batchState
45+
// determines the state of the complete batch else individual offset determines the state of
46+
// the respective records.
47+
private InFlightState batchState;
48+
49+
// The offset state map is used to track the state of the records per offset. However, the
50+
// offset state map is only required when the state of the offsets within same batch are
51+
// different. The states can be different when explicit offset acknowledgment is done which
52+
// is different from the batch state.
53+
private NavigableMap<Long, InFlightState> offsetState;
54+
55+
public InFlightBatch(
56+
Timer timer,
57+
Time time,
58+
String memberId,
59+
long firstOffset,
60+
long lastOffset,
61+
RecordState state,
62+
int deliveryCount,
63+
AcquisitionLockTimerTask acquisitionLockTimeoutTask,
64+
AcquisitionLockTimeoutHandler timeoutHandler,
65+
SharePartitionMetrics sharePartitionMetrics
66+
) {
67+
this.timer = timer;
68+
this.time = time;
69+
this.firstOffset = firstOffset;
70+
this.lastOffset = lastOffset;
71+
this.timeoutHandler = timeoutHandler;
72+
this.sharePartitionMetrics = sharePartitionMetrics;
73+
this.batchState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask);
74+
}
75+
76+
/**
77+
* @return the first offset of the batch.
78+
*/
79+
public long firstOffset() {
80+
return firstOffset;
81+
}
82+
83+
/**
84+
* @return the last offset of the batch.
85+
*/
86+
public long lastOffset() {
87+
return lastOffset;
88+
}
89+
90+
/**
91+
* @return the state of the batch.
92+
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
93+
*/
94+
public RecordState batchState() {
95+
return inFlightState().state();
96+
}
97+
98+
/**
99+
* @return the member id of the batch.
100+
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
101+
*/
102+
public String batchMemberId() {
103+
return inFlightState().memberId();
104+
}
105+
106+
/**
107+
* @return the delivery count of the batch.
108+
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
109+
*/
110+
public int batchDeliveryCount() {
111+
return inFlightState().deliveryCount();
112+
}
113+
114+
/**
115+
* @return the acquisition lock timeout task for the batch.
116+
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
117+
*/
118+
public AcquisitionLockTimerTask batchAcquisitionLockTimeoutTask() {
119+
return inFlightState().acquisitionLockTimeoutTask();
120+
}
121+
122+
/**
123+
* @return the offset state map which maintains the state of the records per offset.
124+
*/
125+
public NavigableMap<Long, InFlightState> offsetState() {
126+
return offsetState;
127+
}
128+
129+
/**
130+
* Cancel the acquisition lock timeout task and clear the reference to it.
131+
* This method is used to cancel the acquisition lock timeout task if it exists
132+
* and clear the reference to it.
133+
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
134+
*/
135+
public void cancelAndClearAcquisitionLockTimeoutTask() {
136+
inFlightState().cancelAndClearAcquisitionLockTimeoutTask();
137+
}
138+
139+
/**
140+
* @return true if the batch has an ongoing state transition, false otherwise.
141+
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
142+
*/
143+
public boolean batchHasOngoingStateTransition() {
144+
return inFlightState().hasOngoingStateTransition();
145+
}
146+
147+
/**
148+
* Archive the batch state. This is used to mark the batch as archived and no further updates
149+
* are allowed to the batch state.
150+
* @param newMemberId The new member id for the records.
151+
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
152+
*/
153+
public void archiveBatch(String newMemberId) {
154+
inFlightState().archive(newMemberId);
155+
}
156+
157+
/**
158+
* Try to update the batch state. The state of the batch can only be updated if the new state is allowed
159+
* to be transitioned from old state. The delivery count is not changed if the state update is unsuccessful.
160+
*
161+
* @param newState The new state of the records.
162+
* @param ops The behavior on the delivery count.
163+
* @param maxDeliveryCount The maximum delivery count for the records.
164+
* @param newMemberId The new member id for the records.
165+
* @return {@code InFlightState} if update succeeds, null otherwise. Returning state helps update chaining.
166+
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
167+
*/
168+
public InFlightState tryUpdateBatchState(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
169+
return inFlightState().tryUpdateState(newState, ops, maxDeliveryCount, newMemberId);
170+
}
171+
172+
/**
173+
* Start a state transition for the batch. This is used to mark the batch as in-flight and
174+
* no further updates are allowed to the batch state.
175+
*
176+
* @param newState The new state of the records.
177+
* @param ops The behavior on the delivery count.
178+
* @param maxDeliveryCount The maximum delivery count for the records.
179+
* @param newMemberId The new member id for the records.
180+
* @return {@code InFlightState} if update succeeds, null otherwise. Returning state helps update chaining.
181+
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
182+
*/
183+
public InFlightState startBatchStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount,
184+
String newMemberId
185+
) {
186+
return inFlightState().startStateTransition(newState, ops, maxDeliveryCount, newMemberId);
187+
}
188+
189+
/**
190+
* Initialize the offset state map if it is not already initialized. This is used to maintain the state of the
191+
* records per offset when the state of the offsets within same batch are different.
192+
*/
193+
public void maybeInitializeOffsetStateUpdate() {
194+
if (offsetState == null) {
195+
offsetState = new ConcurrentSkipListMap<>();
196+
// The offset state map is not initialized hence initialize the state of the offsets
197+
// from the first offset to the last offset. Mark the batch inflightState to null as
198+
// the state of the records is maintained in the offset state map now.
199+
for (long offset = this.firstOffset; offset <= this.lastOffset; offset++) {
200+
if (batchState.acquisitionLockTimeoutTask() != null) {
201+
// The acquisition lock timeout task is already scheduled for the batch, hence we need to schedule
202+
// the acquisition lock timeout task for the offset as well.
203+
long delayMs = batchState.acquisitionLockTimeoutTask().expirationMs() - time.hiResClockMs();
204+
AcquisitionLockTimerTask timerTask = acquisitionLockTimerTask(batchState.memberId(), offset, offset, delayMs);
205+
offsetState.put(offset, new InFlightState(batchState.state(), batchState.deliveryCount(), batchState.memberId(), timerTask));
206+
timer.add(timerTask);
207+
} else {
208+
offsetState.put(offset, new InFlightState(batchState.state(), batchState.deliveryCount(), batchState.memberId()));
209+
}
210+
}
211+
// Cancel the acquisition lock timeout task for the batch as the offset state is maintained.
212+
if (batchState.acquisitionLockTimeoutTask() != null) {
213+
batchState.cancelAndClearAcquisitionLockTimeoutTask();
214+
}
215+
batchState = null;
216+
}
217+
}
218+
219+
/**
220+
* Update the acquisition lock timeout task for the batch. This is used to update the acquisition lock timeout
221+
* task for the batch when the acquisition lock timeout is changed.
222+
*
223+
* @param acquisitionLockTimeoutTask The new acquisition lock timeout task for the batch.
224+
* @throws IllegalStateException if the offset state is maintained and the batch state is not available.
225+
*/
226+
public void updateAcquisitionLockTimeout(AcquisitionLockTimerTask acquisitionLockTimeoutTask) {
227+
inFlightState().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask);
228+
}
229+
230+
private InFlightState inFlightState() {
231+
if (batchState == null) {
232+
throw new IllegalStateException("The batch state is not available as the offset state is maintained");
233+
}
234+
return batchState;
235+
}
236+
237+
private AcquisitionLockTimerTask acquisitionLockTimerTask(
238+
String memberId,
239+
long firstOffset,
240+
long lastOffset,
241+
long delayMs
242+
) {
243+
return new AcquisitionLockTimerTask(time, delayMs, memberId, firstOffset, lastOffset, timeoutHandler, sharePartitionMetrics);
244+
}
245+
246+
@Override
247+
public String toString() {
248+
return "InFlightBatch(" +
249+
"firstOffset=" + firstOffset +
250+
", lastOffset=" + lastOffset +
251+
", inFlightState=" + batchState +
252+
", offsetState=" + ((offsetState == null) ? "null" : offsetState) +
253+
")";
254+
}
255+
}

0 commit comments

Comments
 (0)