Skip to content

Commit 2c83ef0

Browse files
authored
Allow configuration of the queued byte threshold at which a Stream is considered not ready (#10977)
* Allow the queued byte threshold for a Stream to be ready to be configurable - on clients this is exposed by setting a CallOption - on servers this is configured by calling a method on ServerCall or ServerStreamListener
1 parent 68eb639 commit 2c83ef0

File tree

26 files changed

+330
-21
lines changed

26 files changed

+330
-21
lines changed

api/src/main/java/io/grpc/CallOptions.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ public final class CallOptions {
7979
private final Integer maxInboundMessageSize;
8080
@Nullable
8181
private final Integer maxOutboundMessageSize;
82+
@Nullable
83+
private final Integer onReadyThreshold;
8284

8385
private CallOptions(Builder builder) {
8486
this.deadline = builder.deadline;
@@ -91,6 +93,7 @@ private CallOptions(Builder builder) {
9193
this.waitForReady = builder.waitForReady;
9294
this.maxInboundMessageSize = builder.maxInboundMessageSize;
9395
this.maxOutboundMessageSize = builder.maxOutboundMessageSize;
96+
this.onReadyThreshold = builder.onReadyThreshold;
9497
}
9598

9699
static class Builder {
@@ -105,6 +108,7 @@ static class Builder {
105108
Boolean waitForReady;
106109
Integer maxInboundMessageSize;
107110
Integer maxOutboundMessageSize;
111+
Integer onReadyThreshold;
108112

109113
private CallOptions build() {
110114
return new CallOptions(this);
@@ -203,6 +207,46 @@ public CallOptions withoutWaitForReady() {
203207
return builder.build();
204208
}
205209

210+
/**
211+
* Specifies how many bytes must be queued before the call is
212+
* considered not ready to send more messages.
213+
*
214+
* @param numBytes The number of bytes that must be queued. Must be a
215+
* positive integer.
216+
*/
217+
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021")
218+
public CallOptions withOnReadyThreshold(int numBytes) {
219+
checkArgument(numBytes > 0, "numBytes must be positive: %s", numBytes);
220+
Builder builder = toBuilder(this);
221+
builder.onReadyThreshold = numBytes;
222+
return builder.build();
223+
}
224+
225+
/**
226+
* Resets to the default number of bytes that must be queued before the
227+
* call will leave the <a href="https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md">
228+
* 'wait for ready'</a> state.
229+
*/
230+
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021")
231+
public CallOptions clearOnReadyThreshold() {
232+
Builder builder = toBuilder(this);
233+
builder.onReadyThreshold = null;
234+
return builder.build();
235+
}
236+
237+
/**
238+
* Returns to the default number of bytes that must be queued before the
239+
* call will leave the <a href="https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md">
240+
* 'wait for ready'</a> state.
241+
*
242+
* @return null if the default threshold is used.
243+
*/
244+
@Nullable
245+
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021")
246+
public Integer getOnReadyThreshold() {
247+
return onReadyThreshold;
248+
}
249+
206250
/**
207251
* Returns the compressor's name.
208252
*/

api/src/main/java/io/grpc/PartialForwardingServerCall.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ public void setMessageCompression(boolean enabled) {
5858
delegate().setMessageCompression(enabled);
5959
}
6060

61+
@Override
62+
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021")
63+
public void setOnReadyThreshold(int numBytes) {
64+
delegate().setOnReadyThreshold(numBytes);
65+
}
66+
6167
@Override
6268
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1704")
6369
public void setCompression(String compressor) {

api/src/main/java/io/grpc/ServerCall.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.grpc;
1818

19+
import static com.google.common.base.Preconditions.checkArgument;
20+
1921
import javax.annotation.Nullable;
2022

2123
/**
@@ -209,6 +211,19 @@ public void setCompression(String compressor) {
209211
// noop
210212
}
211213

214+
/**
215+
* A hint to the call that specifies how many bytes must be queued before
216+
* {@link #isReady()} will return false. A call may ignore this property if
217+
* unsupported. This may only be set before any messages are sent.
218+
*
219+
* @param numBytes The number of bytes that must be queued. Must be a
220+
* positive integer.
221+
*/
222+
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021")
223+
public void setOnReadyThreshold(int numBytes) {
224+
checkArgument(numBytes > 0, "numBytes must be positive: %s", numBytes);
225+
}
226+
212227
/**
213228
* Returns the level of security guarantee in communications
214229
*

binder/src/main/java/io/grpc/binder/internal/MultiMessageServerStream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ public void setListener(ServerStreamListener listener) {
6464
}
6565
}
6666

67+
@Override
68+
public void setOnReadyThreshold(int numBytes) {
69+
// No-op
70+
}
71+
6772
@Override
6873
public boolean isReady() {
6974
return outbound.isReady();

binder/src/main/java/io/grpc/binder/internal/SingleMessageServerStream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ public void setListener(ServerStreamListener listener) {
6767
}
6868
}
6969

70+
@Override
71+
public void setOnReadyThreshold(int numBytes) {
72+
// No-op
73+
}
74+
7075
@Override
7176
public boolean isReady() {
7277
return outbound.isReady();

core/src/main/java/io/grpc/internal/AbstractClientStream.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,9 +243,13 @@ protected abstract static class TransportState extends AbstractStream.TransportS
243243
protected TransportState(
244244
int maxMessageSize,
245245
StatsTraceContext statsTraceCtx,
246-
TransportTracer transportTracer) {
246+
TransportTracer transportTracer,
247+
CallOptions options) {
247248
super(maxMessageSize, statsTraceCtx, transportTracer);
248249
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
250+
if (options.getOnReadyThreshold() != null) {
251+
this.setOnReadyThreshold(options.getOnReadyThreshold());
252+
}
249253
}
250254

251255
private void setFullStreamDecompression(boolean fullStreamDecompression) {

core/src/main/java/io/grpc/internal/AbstractServerStream.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,19 @@ public StatsTraceContext statsTraceContext() {
177177
return statsTraceCtx;
178178
}
179179

180+
/**
181+
* A hint to the stream that specifies how many bytes must be queued before
182+
* {@link #isReady()} will return false. A stream may ignore this property
183+
* if unsupported. This may only be set before any messages are sent.
184+
*
185+
* @param numBytes The number of bytes that must be queued. Must be a
186+
* positive integer.
187+
*/
188+
@Override
189+
public void setOnReadyThreshold(int numBytes) {
190+
super.setOnReadyThreshold(numBytes);
191+
}
192+
180193
/**
181194
* This should only be called from the transport thread (except for private interactions with
182195
* {@code AbstractServerStream}).
@@ -243,6 +256,8 @@ public void deframerClosed(boolean hasPartialMessage) {
243256
}
244257
}
245258

259+
260+
246261
@Override
247262
protected ServerStreamListener listener() {
248263
return listener;

core/src/main/java/io/grpc/internal/AbstractStream.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,19 @@ public final void flush() {
7777
}
7878
}
7979

80+
/**
81+
* A hint to the stream that specifies how many bytes must be queued before
82+
* {@link #isReady()} will return false. A stream may ignore this property if
83+
* unsupported. This may only be set during stream initialization before
84+
* any messages are set.
85+
*
86+
* @param numBytes The number of bytes that must be queued. Must be a
87+
* positive integer.
88+
*/
89+
protected void setOnReadyThreshold(int numBytes) {
90+
transportState().setOnReadyThreshold(numBytes);
91+
}
92+
8093
/**
8194
* Closes the underlying framer. Should be called when the outgoing stream is gracefully closed
8295
* (half closure on client; closure on server).
@@ -143,6 +156,9 @@ public abstract static class TransportState
143156
@GuardedBy("onReadyLock")
144157
private boolean deallocated;
145158

159+
@GuardedBy("onReadyLock")
160+
private int onReadyThreshold;
161+
146162
protected TransportState(
147163
int maxMessageSize,
148164
StatsTraceContext statsTraceCtx,
@@ -157,6 +173,7 @@ protected TransportState(
157173
transportTracer);
158174
// TODO(#7168): use MigratingThreadDeframer when enabling retry doesn't break.
159175
deframer = rawDeframer;
176+
onReadyThreshold = DEFAULT_ONREADY_THRESHOLD;
160177
}
161178

162179
final void optimizeForDirectExecutor() {
@@ -178,6 +195,20 @@ final void setMaxInboundMessageSize(int maxSize) {
178195
*/
179196
protected abstract StreamListener listener();
180197

198+
/**
199+
* A hint to the stream that specifies how many bytes must be queued before
200+
* {@link #isReady()} will return false. A stream may ignore this property if
201+
* unsupported. This may only be set before any messages are sent.
202+
*
203+
* @param numBytes The number of bytes that must be queued. Must be a
204+
* positive integer.
205+
*/
206+
void setOnReadyThreshold(int numBytes) {
207+
synchronized (onReadyLock) {
208+
this.onReadyThreshold = numBytes;
209+
}
210+
}
211+
181212
@Override
182213
public void messagesAvailable(StreamListener.MessageProducer producer) {
183214
listener().messagesAvailable(producer);
@@ -259,7 +290,7 @@ protected final void setDecompressor(Decompressor decompressor) {
259290

260291
private boolean isReady() {
261292
synchronized (onReadyLock) {
262-
return allocated && numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD && !deallocated;
293+
return allocated && numSentBytesQueued < onReadyThreshold && !deallocated;
263294
}
264295
}
265296

@@ -316,9 +347,9 @@ public final void onSentBytes(int numBytes) {
316347
synchronized (onReadyLock) {
317348
checkState(allocated,
318349
"onStreamAllocated was not called, but it seems the stream is active");
319-
boolean belowThresholdBefore = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD;
350+
boolean belowThresholdBefore = numSentBytesQueued < onReadyThreshold;
320351
numSentBytesQueued -= numBytes;
321-
boolean belowThresholdAfter = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD;
352+
boolean belowThresholdAfter = numSentBytesQueued < onReadyThreshold;
322353
doNotify = !belowThresholdBefore && belowThresholdAfter;
323354
}
324355
if (doNotify) {

core/src/main/java/io/grpc/internal/Http2ClientStreamTransportState.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.common.base.Charsets;
2020
import com.google.common.base.Preconditions;
21+
import io.grpc.CallOptions;
2122
import io.grpc.InternalMetadata;
2223
import io.grpc.InternalStatus;
2324
import io.grpc.Metadata;
@@ -67,8 +68,9 @@ public Integer parseAsciiString(byte[] serialized) {
6768
protected Http2ClientStreamTransportState(
6869
int maxMessageSize,
6970
StatsTraceContext statsTraceCtx,
70-
TransportTracer transportTracer) {
71-
super(maxMessageSize, statsTraceCtx, transportTracer);
71+
TransportTracer transportTracer,
72+
CallOptions options) {
73+
super(maxMessageSize, statsTraceCtx, transportTracer, options);
7274
}
7375

7476
/**

core/src/main/java/io/grpc/internal/ServerCallImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,11 @@ public void setMessageCompression(boolean enable) {
184184
stream.setMessageCompression(enable);
185185
}
186186

187+
@Override
188+
public void setOnReadyThreshold(int numBytes) {
189+
stream.setOnReadyThreshold(numBytes);
190+
}
191+
187192
@Override
188193
public void setCompression(String compressorName) {
189194
// Added here to give a better error message.

0 commit comments

Comments
 (0)