Skip to content

Commit b313f57

Browse files
committed
Polish #4027
Merged RepeatSpec with Repeat to avoid exposing package private RepeatSignal. Also, removed explicit contextWrite which is unnecessary when there is no onErrorStop used like with Retry companion. Fixes #3545 Signed-off-by: Dariusz Jędrzejczyk <[email protected]>
1 parent c6574a1 commit b313f57

File tree

4 files changed

+113
-162
lines changed

4 files changed

+113
-162
lines changed

reactor-core/src/main/java/reactor/util/repeat/ImmutableRepeatSignal.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
import java.time.Duration;
2222

2323
/**
24-
* An immutable {@link reactor.util.repeat.Repeat.RepeatSignal} that can be used for
24+
* An immutable {@link reactor.util.repeat.RepeatSpec.RepeatSignal} that can be used for
2525
* retained copies of mutable implementations.
2626
*
2727
* @author Daeho Kwon
2828
*/
29-
final class ImmutableRepeatSignal implements Repeat.RepeatSignal {
29+
final class ImmutableRepeatSignal implements RepeatSpec.RepeatSignal {
3030

3131
private final long iteration;
3232
private final Long companionValue;
@@ -64,7 +64,7 @@ public ContextView repeatContextView() {
6464
}
6565

6666
@Override
67-
public Repeat.RepeatSignal copy() {
67+
public RepeatSpec.RepeatSignal copy() {
6868
return this;
6969
}
7070

reactor-core/src/main/java/reactor/util/repeat/Repeat.java

Lines changed: 0 additions & 103 deletions
This file was deleted.

reactor-core/src/main/java/reactor/util/repeat/RepeatSpec.java

Lines changed: 95 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.time.Duration;
2020
import java.util.concurrent.ThreadLocalRandom;
2121
import java.util.function.Consumer;
22+
import java.util.function.Function;
2223
import java.util.function.Predicate;
2324

2425
import org.reactivestreams.Publisher;
@@ -46,17 +47,22 @@
4647
* <li>Context propagation via {@link #withRepeatContext(ContextView)}</li>
4748
* </ul>
4849
* This strategy does not retry based on error signals, but rather repeats a successful sequence (until termination condition is met).
50+
* <p>
51+
* The companion {@code Flux<Long>} represents repeat signals, and each value corresponds to a repeat attempt.
52+
* This class transforms the repeat signals into a {@link Publisher} that determines whether to trigger a repeat.
4953
*
5054
* @author Daeho Kwon
5155
*/
52-
public final class RepeatSpec extends Repeat {
56+
public final class RepeatSpec implements Function<Flux<Long>, Publisher<?>> {
5357

5458
private static final Predicate<RepeatSignal> ALWAYS_TRUE = __ -> true;
5559
private static final Consumer<RepeatSignal> NO_OP_CONSUMER = rs -> {
5660
};
5761
private static final RepeatSignal TERMINATE =
5862
new ImmutableRepeatSignal(-1, -1L, Duration.ZERO, Context.empty());
5963

64+
final ContextView repeatContext;
65+
6066
final long maxRepeats;
6167
final Predicate<RepeatSignal> repeatPredicate;
6268
final Consumer<RepeatSignal> beforeRepeatHook;
@@ -76,7 +82,7 @@ private RepeatSpec(ContextView repeatContext,
7682
Duration fixedDelay,
7783
Scheduler scheduler,
7884
double jitterFactor) {
79-
super(repeatContext);
85+
this.repeatContext = repeatContext;
8086
this.maxRepeats = maxRepeats;
8187
this.repeatPredicate = repeatPredicate;
8288
this.beforeRepeatHook = beforeRepeatHook;
@@ -135,12 +141,12 @@ public static RepeatSpec create(Predicate<RepeatSignal> predicate, long n) {
135141
}
136142

137143
/**
138-
* Set the user provided {@link Repeat#repeatContext() context} that can be used to
144+
* Set the user provided {@link ContextView contextView} that can be used to
139145
* manipulate state on retries.
140146
*
141147
* @param repeatContext a new snapshot of user provided data
142-
* @return a new copy of the {@link RepeatSpec} which can either be further configured
143-
* or used as {@link Repeat}
148+
* @return a new copy of the {@link RepeatSpec} which can either be further
149+
* configured.
144150
*/
145151
public RepeatSpec withRepeatContext(ContextView repeatContext) {
146152
return new RepeatSpec(repeatContext,
@@ -260,44 +266,50 @@ public RepeatSpec withScheduler(Scheduler scheduler) {
260266
}
261267

262268
@Override
269+
public Publisher<?> apply(Flux<Long> signals) {
270+
return generateCompanion(signals);
271+
}
272+
273+
/**
274+
* Generates the companion publisher responsible for reacting to incoming repeat signals,
275+
* effectively deciding whether to trigger another repeat cycle.
276+
*
277+
* @param signals the incoming repeat signals, where each {@link Long} value indicates the iteration index
278+
* @return the companion publisher that determines if a repeat should occur
279+
*/
263280
public Publisher<Long> generateCompanion(Flux<Long> signals) {
264-
return Flux.deferContextual(cv -> signals.index()
265-
.contextWrite(cv)
266-
.map(tuple -> {
267-
long iter = tuple.getT1();
268-
long companionValue = tuple.getT2();
269-
270-
RepeatSignal signal =
271-
new ImmutableRepeatSignal(
272-
iter,
273-
companionValue,
274-
calculateDelay(),
275-
repeatContext());
276-
277-
if (iter >= maxRepeats || !repeatPredicate.test(
278-
signal)) {
279-
return TERMINATE;
280-
}
281-
return signal;
282-
})
283-
.takeWhile(signal -> signal != TERMINATE)
284-
.concatMap(signal -> {
285-
try {
286-
beforeRepeatHook.accept(signal);
287-
}
288-
catch (Throwable e) {
289-
return Mono.error(e);
290-
}
291-
292-
Duration delay = signal.backoff();
293-
Mono<Long> trigger = delay.isZero() ?
294-
Mono.just(signal.companionValue()) :
295-
Mono.delay(delay, scheduler)
296-
.thenReturn(signal.companionValue());
297-
298-
return trigger.doOnSuccess(v -> afterRepeatHook.accept(
299-
signal));
300-
}));
281+
return signals.index()
282+
.map(tuple -> {
283+
long iter = tuple.getT1();
284+
long companionValue = tuple.getT2();
285+
286+
RepeatSignal signal = new ImmutableRepeatSignal(iter,
287+
companionValue,
288+
calculateDelay(),
289+
this.repeatContext);
290+
291+
if (iter >= maxRepeats || !repeatPredicate.test(signal)) {
292+
return TERMINATE;
293+
}
294+
return signal;
295+
})
296+
.takeWhile(signal -> signal != TERMINATE)
297+
.concatMap(signal -> {
298+
try {
299+
beforeRepeatHook.accept(signal);
300+
}
301+
catch (Throwable e) {
302+
return Mono.error(e);
303+
}
304+
305+
Duration delay = signal.backoff();
306+
Mono<Long> trigger =
307+
delay.isZero() ? Mono.just(signal.companionValue()) :
308+
Mono.delay(delay, scheduler)
309+
.thenReturn(signal.companionValue());
310+
311+
return trigger.doOnSuccess(v -> afterRepeatHook.accept(signal));
312+
});
301313
}
302314

303315
Duration calculateDelay() {
@@ -311,4 +323,45 @@ Duration calculateDelay() {
311323
}
312324
return actual;
313325
}
326+
327+
/**
328+
* State information associated with each repeat signal, used in repeat strategies.
329+
*/
330+
public interface RepeatSignal {
331+
332+
/**
333+
* Returns the current iteration count, starting from 0.
334+
*
335+
* @return the iteration index
336+
*/
337+
long iteration();
338+
339+
/**
340+
* Returns the value from the companion publisher that triggered this repeat signal.
341+
*
342+
* @return the companion value
343+
*/
344+
Long companionValue();
345+
346+
/**
347+
* Returns the delay before the next repeat attempt.
348+
*
349+
* @return the backoff duration
350+
*/
351+
Duration backoff();
352+
353+
/**
354+
* Returns the read-only context associated with this repeat signal.
355+
*
356+
* @return the repeat context view
357+
*/
358+
ContextView repeatContextView();
359+
360+
/**
361+
* Returns an immutable copy of this {@link RepeatSignal}, capturing the current state.
362+
*
363+
* @return an immutable copy of the signal
364+
*/
365+
RepeatSignal copy();
366+
}
314367
}

reactor-core/src/test/java/reactor/util/repeat/RepeatSpecTest.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package reactor.util.repeat;
1818

1919
import java.time.Duration;
20+
import java.time.Instant;
2021
import java.util.concurrent.atomic.AtomicInteger;
2122

2223
import org.junit.jupiter.api.Test;
@@ -50,10 +51,10 @@ void onlyIfReplacesPredicate() {
5051
.onlyIf(signal -> signal.iteration() < 5)
5152
.onlyIf(signal -> signal.iteration() == 0);
5253

53-
Repeat.RepeatSignal acceptSignal =
54+
RepeatSpec.RepeatSignal acceptSignal =
5455
new ImmutableRepeatSignal(0, 123L, Duration.ofMillis(0), Context.empty());
5556

56-
Repeat.RepeatSignal rejectSignal =
57+
RepeatSpec.RepeatSignal rejectSignal =
5758
new ImmutableRepeatSignal(4, 123L, Duration.ofMillis(0), Context.empty());
5859

5960
assertThat(repeatSpec.repeatPredicate).accepts(acceptSignal)
@@ -64,7 +65,7 @@ void onlyIfReplacesPredicate() {
6465
void doBeforeRepeatIsCumulative() {
6566
AtomicInteger counter = new AtomicInteger();
6667

67-
Repeat.RepeatSignal dummySignal =
68+
RepeatSpec.RepeatSignal dummySignal =
6869
new ImmutableRepeatSignal(0, 0L, Duration.ZERO, Context.empty());
6970

7071
RepeatSpec repeatSpec = RepeatSpec.times(1)
@@ -80,7 +81,7 @@ void doBeforeRepeatIsCumulative() {
8081
void doAfterRepeatIsCumulative() {
8182
AtomicInteger counter = new AtomicInteger();
8283

83-
Repeat.RepeatSignal dummySignal =
84+
RepeatSpec.RepeatSignal dummySignal =
8485
new ImmutableRepeatSignal(0, 0L, Duration.ZERO, Context.empty());
8586

8687
RepeatSpec repeatSpec = RepeatSpec.times(1)
@@ -170,16 +171,16 @@ void jitterProducesDelayInExpectedRange() {
170171

171172
@Test
172173
void repeatContextCanInfluencePredicate() {
173-
RepeatSpec repeatSpec = RepeatSpec.times(5)
174-
.withRepeatContext(Context.of("stopAfterOne",
175-
true))
176-
.onlyIf(signal -> {
177-
boolean stopAfterOne =
178-
signal.repeatContextView()
179-
.getOrDefault("stopAfterOne",
180-
false);
181-
return !stopAfterOne || signal.iteration() == 0;
182-
});
174+
RepeatSpec repeatSpec =
175+
RepeatSpec.times(5)
176+
.withRepeatContext(Context.of("stopAfterOne", true))
177+
.onlyIf(signal -> {
178+
Boolean contextEntry =
179+
signal.repeatContextView()
180+
.getOrDefault("stopAfterOne", Boolean.FALSE);
181+
boolean stopAfterOne = Boolean.TRUE.equals(contextEntry);
182+
return !stopAfterOne || signal.iteration() == 0;
183+
});
183184

184185
AtomicInteger subscriptionCount = new AtomicInteger();
185186
Flux<String> source = Flux.defer(() -> {

0 commit comments

Comments
 (0)