Skip to content

Commit 755f293

Browse files
authored
changes from proto updates + read session heartbeat (#14)
1 parent 04f8163 commit 755f293

File tree

8 files changed

+95
-22
lines changed

8 files changed

+95
-22
lines changed

app/src/main/java/org/example/app/ManagedReadSessionDemo.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import s2.config.Config;
1111
import s2.config.Endpoints;
1212
import s2.types.Batch;
13-
import s2.types.ReadLimit;
1413
import s2.types.ReadSessionRequest;
1514

1615
public class ManagedReadSessionDemo {
@@ -50,7 +49,7 @@ public static void main(String[] args) throws Exception {
5049

5150
try (final var managedSession =
5251
streamClient.managedReadSession(
53-
ReadSessionRequest.newBuilder().withReadLimit(ReadLimit.count(100_000)).build(),
52+
ReadSessionRequest.newBuilder().withHeartbeats(true).build(),
5453
1024 * 1024 * 1024 * 5)) {
5554

5655
AtomicLong receivedBytes = new AtomicLong();

app/src/main/resources/logback.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
</encoder>
88
</appender>
99

10-
<logger additivity="false" level="info" name="s2">
10+
<logger additivity="false" level="trace" name="s2">
1111
<appender-ref ref="console"/>
1212
</logger>
1313

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.0.14-SNAPSHOT
1+
version=0.0.14

s2-sdk/src/main/java/s2/client/ReadSession.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import io.grpc.Status;
77
import io.grpc.stub.StreamObserver;
88
import java.util.concurrent.ScheduledExecutorService;
9+
import java.util.concurrent.ScheduledFuture;
10+
import java.util.concurrent.TimeUnit;
911
import java.util.concurrent.atomic.AtomicInteger;
1012
import java.util.concurrent.atomic.AtomicLong;
1113
import java.util.function.Consumer;
@@ -20,6 +22,8 @@ public class ReadSession implements AutoCloseable {
2022

2123
private static final Logger logger = LoggerFactory.getLogger(ReadSession.class.getName());
2224

25+
private static final Long HEARTBEAT_THRESHOLD_NANOS = TimeUnit.SECONDS.toNanos(20);
26+
2327
final ScheduledExecutorService executor;
2428
final StreamClient client;
2529

@@ -28,6 +32,10 @@ public class ReadSession implements AutoCloseable {
2832
final AtomicLong consumedBytes = new AtomicLong(0);
2933
final AtomicInteger remainingAttempts;
3034

35+
// Liveness timer.
36+
final AtomicLong lastEvent;
37+
final ListenableFuture<Void> livenessDaemon;
38+
3139
final Consumer<ReadOutput> onResponse;
3240
final Consumer<Throwable> onError;
3341

@@ -46,6 +54,9 @@ public class ReadSession implements AutoCloseable {
4654
this.request = request;
4755
this.nextStartSeqNum = new AtomicLong(request.startSeqNum);
4856
this.remainingAttempts = new AtomicInteger(client.config.maxRetries);
57+
this.lastEvent = new AtomicLong(System.nanoTime());
58+
59+
this.livenessDaemon = request.heartbeats ? livenessDaemon() : Futures.immediateFuture(null);
4960
this.daemon = this.retrying();
5061
}
5162

@@ -60,7 +71,12 @@ private ListenableFuture<Void> readSessionInner(
6071

6172
@Override
6273
public void onNext(ReadSessionResponse value) {
63-
innerOnResponse.accept(ReadOutput.fromProto(value.getOutput()));
74+
lastEvent.set(System.nanoTime());
75+
if (value.hasOutput()) {
76+
innerOnResponse.accept(ReadOutput.fromProto(value.getOutput()));
77+
} else {
78+
logger.trace("heartbeat");
79+
}
6480
}
6581

6682
@Override
@@ -72,12 +88,48 @@ public void onError(Throwable t) {
7288
@Override
7389
public void onCompleted() {
7490
logger.debug("Read session inner onCompleted");
91+
livenessDaemon.cancel(true);
7592
fut.set(null);
7693
}
7794
});
7895
return fut;
7996
}
8097

98+
private ListenableFuture<Void> livenessDaemon() {
99+
SettableFuture<Void> livenessFuture = SettableFuture.create();
100+
scheduleLivenessCheck(livenessFuture);
101+
return livenessFuture;
102+
}
103+
104+
private void scheduleLivenessCheck(SettableFuture<Void> livenessFuture) {
105+
final long delay = (lastEvent.get() + HEARTBEAT_THRESHOLD_NANOS) - System.nanoTime();
106+
107+
logger.trace(
108+
"Checking liveness. Next deadline: {} seconds.",
109+
TimeUnit.SECONDS.convert(delay, TimeUnit.NANOSECONDS));
110+
if (delay <= 0) {
111+
this.onError.accept(
112+
Status.DEADLINE_EXCEEDED
113+
.withDescription("ReadSession hit local heartbeat deadline")
114+
.asRuntimeException());
115+
this.daemon.cancel(true);
116+
livenessFuture.set(null);
117+
} else {
118+
ScheduledFuture<?> scheduledCheck =
119+
executor.schedule(
120+
() -> {
121+
if (livenessFuture.isDone()) {
122+
return;
123+
}
124+
scheduleLivenessCheck(livenessFuture);
125+
},
126+
delay,
127+
TimeUnit.NANOSECONDS);
128+
129+
livenessFuture.addListener(() -> scheduledCheck.cancel(true), executor);
130+
}
131+
}
132+
81133
private ListenableFuture<Void> retrying() {
82134

83135
return Futures.catchingAsync(
@@ -107,6 +159,7 @@ private ListenableFuture<Void> retrying() {
107159
} else {
108160
logger.warn("readSession failed, status={}", status.getCode());
109161
onError.accept(t);
162+
this.livenessDaemon.cancel(true);
110163
return Futures.immediateFuture(null);
111164
}
112165
},
@@ -119,6 +172,7 @@ public ListenableFuture<Void> awaitCompletion() {
119172

120173
@Override
121174
public void close() {
175+
this.livenessDaemon.cancel(true);
122176
this.daemon.cancel(true);
123177
}
124178
}

s2-sdk/src/main/java/s2/types/BasinInfo.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,29 @@
22

33
public class BasinInfo {
44
public final String name;
5-
public final String scope;
6-
public final String cell;
5+
public final BasinScope basinScope;
76
public final BasinState basinState;
87

9-
BasinInfo(String name, String scope, String cell, BasinState basinState) {
8+
BasinInfo(String name, BasinScope basinScope, BasinState basinState) {
109
this.name = name;
11-
this.scope = scope;
12-
this.cell = cell;
10+
this.basinScope = basinScope;
1311
this.basinState = basinState;
1412
}
1513

1614
public static BasinInfo fromProto(s2.v1alpha.BasinInfo basinInfo) {
15+
BasinScope scope;
16+
switch (basinInfo.getScope()) {
17+
case BASIN_SCOPE_UNSPECIFIED:
18+
scope = BasinScope.UNSPECIFIED;
19+
break;
20+
case BASIN_SCOPE_AWS_US_EAST_1:
21+
scope = BasinScope.AWS_US_EAST_1;
22+
break;
23+
default:
24+
scope = BasinScope.UNKNOWN;
25+
break;
26+
}
27+
1728
BasinState state;
1829
switch (basinInfo.getState()) {
1930
case BASIN_STATE_UNSPECIFIED:
@@ -33,6 +44,6 @@ public static BasinInfo fromProto(s2.v1alpha.BasinInfo basinInfo) {
3344
break;
3445
}
3546

36-
return new BasinInfo(basinInfo.getName(), basinInfo.getScope(), basinInfo.getCell(), state);
47+
return new BasinInfo(basinInfo.getName(), scope, state);
3748
}
3849
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package s2.types;
2+
3+
public enum BasinScope {
4+
UNSPECIFIED,
5+
AWS_US_EAST_1,
6+
UNKNOWN
7+
}

s2-sdk/src/main/java/s2/types/ReadSessionRequest.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ public class ReadSessionRequest {
66

77
public final long startSeqNum;
88
public final ReadLimit readLimit;
9+
public final boolean heartbeats;
910

10-
protected ReadSessionRequest(long startSeqNum, ReadLimit readLimit) {
11+
protected ReadSessionRequest(long startSeqNum, ReadLimit readLimit, boolean heartbeats) {
1112
this.startSeqNum = startSeqNum;
1213
this.readLimit = readLimit;
14+
this.heartbeats = heartbeats;
1315
}
1416

1517
public static ReadSessionRequestBuilder newBuilder() {
@@ -18,27 +20,22 @@ public static ReadSessionRequestBuilder newBuilder() {
1820

1921
public ReadSessionRequest update(long newStartSeqNum, long consumedRecords, long consumedBytes) {
2022
return new ReadSessionRequest(
21-
newStartSeqNum, readLimit.remaining(consumedRecords, consumedBytes));
22-
}
23-
24-
public s2.v1alpha.ReadSessionRequest toProto() {
25-
return s2.v1alpha.ReadSessionRequest.newBuilder()
26-
.setStartSeqNum(startSeqNum)
27-
.setLimit(readLimit.toProto())
28-
.build();
23+
newStartSeqNum, readLimit.remaining(consumedRecords, consumedBytes), heartbeats);
2924
}
3025

3126
public s2.v1alpha.ReadSessionRequest toProto(String streamName) {
3227
return s2.v1alpha.ReadSessionRequest.newBuilder()
3328
.setStream(streamName)
3429
.setStartSeqNum(startSeqNum)
3530
.setLimit(readLimit.toProto())
31+
.setHeartbeats(heartbeats)
3632
.build();
3733
}
3834

3935
public static class ReadSessionRequestBuilder {
4036
private Optional<Long> startSeqNum = Optional.empty();
4137
private Optional<ReadLimit> readLimit = Optional.empty();
38+
private boolean heartbeats = false;
4239

4340
public ReadSessionRequestBuilder withStartSeqNum(long startSeqNum) {
4441
this.startSeqNum = Optional.of(startSeqNum);
@@ -50,9 +47,14 @@ public ReadSessionRequestBuilder withReadLimit(ReadLimit readLimit) {
5047
return this;
5148
}
5249

50+
public ReadSessionRequestBuilder withHeartbeats(boolean heartbeats) {
51+
this.heartbeats = heartbeats;
52+
return this;
53+
}
54+
5355
public ReadSessionRequest build() {
5456
return new ReadSessionRequest(
55-
this.startSeqNum.orElse(0L), this.readLimit.orElse(ReadLimit.NONE));
57+
this.startSeqNum.orElse(0L), this.readLimit.orElse(ReadLimit.NONE), this.heartbeats);
5658
}
5759
}
5860
}

0 commit comments

Comments
 (0)