1
+ /*
2
+ * Licensed to the Apache Software Foundation (ASF) under one or more
3
+ * contributor license agreements. See the NOTICE file distributed with
4
+ * this work for additional information regarding copyright ownership.
5
+ * The ASF licenses this file to You under the Apache License, Version 2.0
6
+ * (the "License"); you may not use this file except in compliance with
7
+ * the License. You may obtain a copy of the License at
8
+ *
9
+ * http://www.apache.org/licenses/LICENSE-2.0
10
+ *
11
+ * Unless required by applicable law or agreed to in writing, software
12
+ * distributed under the License is distributed on an "AS IS" BASIS,
13
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
+ * See the License for the specific language governing permissions and
15
+ * limitations under the License.
16
+ */
17
+ package org .apache .kafka .server .util .timer ;
18
+
19
+ import org .junit .jupiter .api .Test ;
20
+
21
+ import java .util .concurrent .DelayQueue ;
22
+ import java .util .concurrent .atomic .AtomicInteger ;
23
+
24
+ import static org .junit .jupiter .api .Assertions .assertEquals ;
25
+ import static org .junit .jupiter .api .Assertions .assertFalse ;
26
+ import static org .junit .jupiter .api .Assertions .assertNotNull ;
27
+ import static org .junit .jupiter .api .Assertions .assertNull ;
28
+ import static org .junit .jupiter .api .Assertions .assertSame ;
29
+ import static org .junit .jupiter .api .Assertions .assertTrue ;
30
+
31
+ public class TimingWheelTest {
32
+
33
+ @ Test
34
+ public void testAddValidTask () {
35
+ AtomicInteger taskCounter = new AtomicInteger (0 );
36
+ DelayQueue <TimerTaskList > queue = new DelayQueue <>();
37
+ long startMs = 1000L ;
38
+ long tickMs = 10L ;
39
+ TimingWheel timingWheel = new TimingWheel (tickMs , 5 , startMs , taskCounter , queue );
40
+
41
+ // Create task within current time interval
42
+ long expirationMs = startMs + tickMs * 2 ; // 1020ms
43
+ TimerTask task = new TestTimerTask (tickMs * 2 );
44
+ TimerTaskEntry entry = new TimerTaskEntry (task , expirationMs );
45
+
46
+ assertTrue (timingWheel .add (entry ), "Should successfully add valid task" );
47
+ assertFalse (queue .isEmpty ());
48
+ assertEquals (1 , taskCounter .get ());
49
+ }
50
+
51
+ @ Test
52
+ public void testAddExpiredTask () {
53
+ long startMs = 1000L ;
54
+ TimingWheel timingWheel = new TimingWheel (
55
+ 10L ,
56
+ 5 ,
57
+ startMs ,
58
+ new AtomicInteger (0 ),
59
+ new DelayQueue <>()
60
+ );
61
+
62
+ long expirationMs = startMs - 1 ; // 999ms, less than current time
63
+ TimerTask task = new TestTimerTask (-1 );
64
+ TimerTaskEntry entry = new TimerTaskEntry (task , expirationMs );
65
+
66
+ assertFalse (timingWheel .add (entry ), "Expired task should not be added" );
67
+ }
68
+
69
+ @ Test
70
+ public void testAddCancelledTask () {
71
+ long startMs = 1000L ;
72
+ long tickMs = 10L ;
73
+ TimingWheel timingWheel = new TimingWheel (
74
+ tickMs ,
75
+ 5 ,
76
+ startMs ,
77
+ new AtomicInteger (0 ),
78
+ new DelayQueue <>()
79
+ );
80
+
81
+ long expirationMs = startMs + tickMs * 2 ;
82
+ TimerTask task = new TestTimerTask (tickMs * 2 );
83
+ TimerTaskEntry entry = new TimerTaskEntry (task , expirationMs );
84
+
85
+ task .cancel ();
86
+
87
+ assertFalse (timingWheel .add (entry ), "Cancelled task should not be added" );
88
+ assertTrue (task .isCancelled (), "Task should be marked as cancelled" );
89
+ }
90
+
91
+ @ Test
92
+ public void testAddTaskInCurrentBucket () {
93
+ long startMs = 1000L ;
94
+ TimingWheel timingWheel = new TimingWheel (
95
+ 10L ,
96
+ 5 ,
97
+ startMs ,
98
+ new AtomicInteger (0 ),
99
+ new DelayQueue <>()
100
+ );
101
+
102
+ long expirationMs = startMs + 5 ; // Within current tick
103
+ TimerTask task = new TestTimerTask (5 );
104
+ TimerTaskEntry entry = new TimerTaskEntry (task , expirationMs );
105
+
106
+ assertFalse (timingWheel .add (entry ), "Task within current tick should be expired immediately" );
107
+ }
108
+
109
+ @ Test
110
+ public void testAdvanceClockWithinTick () {
111
+ long startMs = 1000L ;
112
+ TimingWheel timingWheel = new TimingWheel (
113
+ 10L ,
114
+ 5 ,
115
+ startMs ,
116
+ new AtomicInteger (0 ),
117
+ new DelayQueue <>()
118
+ );
119
+
120
+ timingWheel .advanceClock (startMs + 5 );
121
+
122
+ assertEquals (startMs , timingWheel .currentTimeMs (), "Clock should not advance within the same tick" );
123
+ }
124
+
125
+ @ Test
126
+ public void testAdvanceClockToNextTick () {
127
+ long startMs = 1000L ;
128
+ long tickMs = 10L ;
129
+ TimingWheel timingWheel = new TimingWheel (
130
+ tickMs ,
131
+ 5 ,
132
+ startMs ,
133
+ new AtomicInteger (0 ),
134
+ new DelayQueue <>()
135
+ );
136
+
137
+ timingWheel .advanceClock (startMs + tickMs );
138
+
139
+ assertEquals (startMs + tickMs , timingWheel .currentTimeMs (), "Clock should advance to next tick" );
140
+ }
141
+
142
+ @ Test
143
+ public void testOverflowWheelCreation () {
144
+ long startMs = 1000L ;
145
+ long tickMs = 10L ;
146
+ int wheelSize = 5 ;
147
+ TimingWheel timingWheel = new TimingWheel (
148
+ tickMs ,
149
+ wheelSize ,
150
+ startMs ,
151
+ new AtomicInteger (0 ),
152
+ new DelayQueue <>()
153
+ );
154
+
155
+ assertNull (timingWheel .overflowWheel (), "Overflow wheel should not exist initially" );
156
+
157
+ // First overflow task should create parent wheel
158
+ long interval = tickMs * wheelSize ;
159
+ long overflowTime = startMs + interval + tickMs ;
160
+
161
+ TimerTask task = new TestTimerTask (interval + tickMs );
162
+ TimerTaskEntry entry = new TimerTaskEntry (task , overflowTime );
163
+
164
+ assertTrue (timingWheel .add (entry ));
165
+ assertNotNull (timingWheel .overflowWheel (), "Overflow wheel should be created" );
166
+
167
+ // Adding second overflow task should use existing parent wheel
168
+ TimingWheel existingOverflowWheel = timingWheel .overflowWheel ();
169
+ TimerTask task2 = new TestTimerTask (interval + tickMs + 1 );
170
+ TimerTaskEntry entry2 = new TimerTaskEntry (task2 , overflowTime + 1 );
171
+
172
+ assertTrue (timingWheel .add (entry2 ));
173
+ assertSame (existingOverflowWheel , timingWheel .overflowWheel ());
174
+ }
175
+
176
+ @ Test
177
+ public void testAdvanceClockWithOverflowWheel () {
178
+ long startMs = 1000L ;
179
+ long tickMs = 10L ;
180
+ int wheelSize = 5 ;
181
+ TimingWheel timingWheel = new TimingWheel (
182
+ tickMs ,
183
+ wheelSize ,
184
+ startMs ,
185
+ new AtomicInteger (0 ),
186
+ new DelayQueue <>()
187
+ );
188
+
189
+ // Create overflow wheel
190
+ long interval = tickMs * wheelSize ;
191
+ long overflowTime = startMs + interval + tickMs ;
192
+ TimerTask task = new TestTimerTask (interval + tickMs );
193
+ TimerTaskEntry entry = new TimerTaskEntry (task , overflowTime );
194
+ timingWheel .add (entry );
195
+
196
+ assertNotNull (timingWheel .overflowWheel (), "Overflow wheel should be created" );
197
+
198
+ // Advancing clock should also advance overflow wheel clock
199
+ long advanceTime = startMs + tickMs * wheelSize + 10 ; // 1060ms
200
+ timingWheel .advanceClock (advanceTime );
201
+
202
+ // Verify both wheels advanced
203
+ assertEquals (advanceTime , timingWheel .currentTimeMs (), "Main wheel clock should advance" );
204
+ assertEquals (startMs + tickMs * wheelSize , timingWheel .overflowWheel ().currentTimeMs (), "Overflow wheel clock should also advance" );
205
+ }
206
+
207
+ private static class TestTimerTask extends TimerTask {
208
+
209
+ TestTimerTask (long delayMs ) {
210
+ super (delayMs );
211
+ }
212
+
213
+ @ Override
214
+ public void run () {
215
+ // No-op
216
+ }
217
+ }
218
+ }
0 commit comments